1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, cast
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction, types
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.exceptions import (
25 DoesNotExistDescriptor,
26 FieldDoesNotExist,
27 MultipleObjectsReturnedDescriptor,
28)
29from plain.models.expressions import RawSQL, Value
30from plain.models.fields import NOT_PROVIDED, Field
31from plain.models.fields.related import RelatedField
32from plain.models.fields.reverse_related import ForeignObjectRel
33from plain.models.meta import Meta
34from plain.models.options import Options
35from plain.models.query import F, Q, QuerySet
36from plain.preflight import PreflightResult
37from plain.utils.encoding import force_str
38from plain.utils.hashable import make_hashable
39
40
41class Deferred:
42 def __repr__(self) -> str:
43 return "<Deferred field>"
44
45 def __str__(self) -> str:
46 return "<Deferred field>"
47
48
49DEFERRED = Deferred()
50
51
52class ModelBase(type):
53 """Metaclass for all models."""
54
55 def __new__(
56 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
57 ) -> type:
58 # Don't do any of this for the root models.Model class.
59 if not bases:
60 return super().__new__(cls, name, bases, attrs)
61
62 for base in bases:
63 # Models are required to directly inherit from model.Model, not a subclass of it.
64 if issubclass(base, Model) and base is not Model:
65 raise TypeError(
66 f"A model can't extend another model: {name} extends {base}"
67 )
68
69 return super().__new__(cls, name, bases, attrs, **kwargs)
70
71
72class ModelStateFieldsCacheDescriptor:
73 def __get__(
74 self, instance: ModelState | None, cls: type | None = None
75 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
76 if instance is None:
77 return self
78 res = instance.fields_cache = {}
79 return res
80
81
82class ModelState:
83 """Store model instance state."""
84
85 # If true, uniqueness validation checks will consider this a new, unsaved
86 # object. Necessary for correct validation of new instances of objects with
87 # explicit (non-auto) PKs. This impacts validation only; it has no effect
88 # on the actual save.
89 adding = True
90 fields_cache = ModelStateFieldsCacheDescriptor()
91
92
93class Model(metaclass=ModelBase):
94 # Every model gets an automatic id field
95 id: int = types.PrimaryKeyField()
96
97 # Descriptors for other model behavior
98 query: QuerySet[Model] = QuerySet()
99 model_options: Options = Options()
100 _model_meta: Meta = Meta()
101 DoesNotExist = DoesNotExistDescriptor()
102 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
103
104 def __init__(self, **kwargs: Any):
105 # Alias some things as locals to avoid repeat global lookups
106 cls = self.__class__
107 meta = cls._model_meta
108 _setattr = setattr
109 _DEFERRED = DEFERRED
110
111 # Set up the storage for instance state
112 self._state = ModelState()
113
114 # Process all fields from kwargs or use defaults
115 for field in meta.fields:
116 from plain.models.fields.related import RelatedField
117
118 is_related_object = False
119 # Virtual field
120 if field.attname not in kwargs and field.column is None:
121 continue
122 if isinstance(field, RelatedField) and isinstance(
123 field.remote_field, ForeignObjectRel
124 ):
125 try:
126 # Assume object instance was passed in.
127 rel_obj = kwargs.pop(field.name)
128 is_related_object = True
129 except KeyError:
130 try:
131 # Object instance wasn't passed in -- must be an ID.
132 val = kwargs.pop(field.attname)
133 except KeyError:
134 val = field.get_default()
135 else:
136 try:
137 val = kwargs.pop(field.attname)
138 except KeyError:
139 # This is done with an exception rather than the
140 # default argument on pop because we don't want
141 # get_default() to be evaluated, and then not used.
142 # Refs #12057.
143 val = field.get_default()
144
145 if is_related_object:
146 # If we are passed a related instance, set it using the
147 # field.name instead of field.attname (e.g. "user" instead of
148 # "user_id") so that the object gets properly cached (and type
149 # checked) by the RelatedObjectDescriptor.
150 if rel_obj is not _DEFERRED:
151 _setattr(self, field.name, rel_obj)
152 else:
153 if val is not _DEFERRED:
154 _setattr(self, field.attname, val)
155
156 # Handle any remaining kwargs (properties or virtual fields)
157 property_names = meta._property_names
158 unexpected = ()
159 for prop, value in kwargs.items():
160 # Any remaining kwargs must correspond to properties or virtual
161 # fields.
162 if prop in property_names:
163 if value is not _DEFERRED:
164 _setattr(self, prop, value)
165 else:
166 try:
167 meta.get_field(prop)
168 except FieldDoesNotExist:
169 unexpected += (prop,)
170 else:
171 if value is not _DEFERRED:
172 _setattr(self, prop, value)
173 if unexpected:
174 unexpected_names = ", ".join(repr(n) for n in unexpected)
175 raise TypeError(
176 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
177 )
178
179 super().__init__()
180
181 @classmethod
182 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
183 if len(values) != len(cls._model_meta.concrete_fields):
184 values_iter = iter(values)
185 values = [
186 next(values_iter) if f.attname in field_names else DEFERRED
187 for f in cls._model_meta.concrete_fields
188 ]
189 # Build kwargs dict from field names and values
190 field_dict = dict(
191 zip((f.attname for f in cls._model_meta.concrete_fields), values)
192 )
193 new = cls(**field_dict)
194 new._state.adding = False
195 return new
196
197 def __repr__(self) -> str:
198 return f"<{self.__class__.__name__}: {self.id}>"
199
200 def __str__(self) -> str:
201 return f"{self.__class__.__name__} object ({self.id})"
202
203 def __eq__(self, other: object) -> bool:
204 if not isinstance(other, Model):
205 return NotImplemented
206 if self.__class__ != other.__class__:
207 return False
208 my_id = self.id
209 if my_id is None:
210 return self is other
211 return my_id == other.id
212
213 def __hash__(self) -> int:
214 if self.id is None:
215 raise TypeError("Model instances without primary key value are unhashable")
216 return hash(self.id)
217
218 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
219 data = self.__getstate__()
220 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
221 class_id = (
222 self.model_options.package_label,
223 self.model_options.object_name,
224 )
225 return model_unpickle, (class_id,), data
226
227 def __getstate__(self) -> dict[str, Any]:
228 """Hook to allow choosing the attributes to pickle."""
229 state = self.__dict__.copy()
230 state["_state"] = copy.copy(state["_state"])
231 state["_state"].fields_cache = state["_state"].fields_cache.copy()
232 # memoryview cannot be pickled, so cast it to bytes and store
233 # separately.
234 _memoryview_attrs = []
235 for attr, value in state.items():
236 if isinstance(value, memoryview):
237 _memoryview_attrs.append((attr, bytes(value)))
238 if _memoryview_attrs:
239 state["_memoryview_attrs"] = _memoryview_attrs
240 for attr, value in _memoryview_attrs:
241 state.pop(attr)
242 return state
243
244 def __setstate__(self, state: dict[str, Any]) -> None:
245 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
246 if pickled_version:
247 if pickled_version != plain.runtime.__version__:
248 warnings.warn(
249 f"Pickled model instance's Plain version {pickled_version} does not "
250 f"match the current version {plain.runtime.__version__}.",
251 RuntimeWarning,
252 stacklevel=2,
253 )
254 else:
255 warnings.warn(
256 "Pickled model instance's Plain version is not specified.",
257 RuntimeWarning,
258 stacklevel=2,
259 )
260 if "_memoryview_attrs" in state:
261 for attr, value in state.pop("_memoryview_attrs"):
262 state[attr] = memoryview(value)
263 self.__dict__.update(state)
264
265 def get_deferred_fields(self) -> set[str]:
266 """
267 Return a set containing names of deferred fields for this instance.
268 """
269 return {
270 f.attname
271 for f in self._model_meta.concrete_fields
272 if f.attname not in self.__dict__
273 }
274
275 def refresh_from_db(self, fields: list[str] | None = None) -> None:
276 """
277 Reload field values from the database.
278
279 By default, the reloading happens from the database this instance was
280 loaded from, or by the read router if this instance wasn't loaded from
281 any database. The using parameter will override the default.
282
283 Fields can be used to specify which fields to reload. The fields
284 should be an iterable of field attnames. If fields is None, then
285 all non-deferred fields are reloaded.
286
287 When accessing deferred fields of an instance, the deferred loading
288 of the field will call this method.
289 """
290 if fields is None:
291 self._prefetched_objects_cache = {}
292 else:
293 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", {})
294 for field in fields:
295 if field in prefetched_objects_cache:
296 del prefetched_objects_cache[field]
297 fields.remove(field)
298 if not fields:
299 return
300 if any(LOOKUP_SEP in f for f in fields):
301 raise ValueError(
302 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
303 "are not allowed in fields."
304 )
305
306 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
307
308 # Use provided fields, if not set then reload all non-deferred fields.
309 deferred_fields = self.get_deferred_fields()
310 if fields is not None:
311 fields = list(fields)
312 db_instance_qs = db_instance_qs.only(*fields)
313 elif deferred_fields:
314 fields = [
315 f.attname
316 for f in self._model_meta.concrete_fields
317 if f.attname not in deferred_fields
318 ]
319 db_instance_qs = db_instance_qs.only(*fields)
320
321 db_instance = db_instance_qs.get()
322 non_loaded_fields = db_instance.get_deferred_fields()
323 for field in self._model_meta.concrete_fields:
324 if field.attname in non_loaded_fields:
325 # This field wasn't refreshed - skip ahead.
326 continue
327 setattr(self, field.attname, getattr(db_instance, field.attname))
328 # Clear cached foreign keys.
329 if isinstance(field, RelatedField) and field.is_cached(self):
330 field.delete_cached_value(self)
331
332 # Clear cached relations.
333 for field in self._model_meta.related_objects:
334 if field.is_cached(self):
335 field.delete_cached_value(self)
336
337 def serializable_value(self, field_name: str) -> Any:
338 """
339 Return the value of the field name for this instance. If the field is
340 a foreign key, return the id value instead of the object. If there's
341 no Field object with this name on the model, return the model
342 attribute's value.
343
344 Used to serialize a field's value (in the serializer, or form output,
345 for example). Normally, you would just access the attribute directly
346 and not use this method.
347 """
348 try:
349 field = self._model_meta.get_forward_field(field_name)
350 except FieldDoesNotExist:
351 return getattr(self, field_name)
352 return getattr(self, field.attname)
353
354 def save(
355 self,
356 *,
357 clean_and_validate: bool = True,
358 force_insert: bool = False,
359 force_update: bool = False,
360 update_fields: Iterable[str] | None = None,
361 ) -> None:
362 """
363 Save the current instance. Override this in a subclass if you want to
364 control the saving process.
365
366 The 'force_insert' and 'force_update' parameters can be used to insist
367 that the "save" must be an SQL insert or update (or equivalent for
368 non-SQL backends), respectively. Normally, they should not be set.
369 """
370 self._prepare_related_fields_for_save(operation_name="save")
371
372 if force_insert and (force_update or update_fields):
373 raise ValueError("Cannot force both insert and updating in model saving.")
374
375 deferred_fields = self.get_deferred_fields()
376 if update_fields is not None:
377 # If update_fields is empty, skip the save. We do also check for
378 # no-op saves later on for inheritance cases. This bailout is
379 # still needed for skipping signal sending.
380 if not update_fields:
381 return
382
383 update_fields = frozenset(update_fields)
384 field_names = self._model_meta._non_pk_concrete_field_names
385 non_model_fields = update_fields.difference(field_names)
386
387 if non_model_fields:
388 raise ValueError(
389 "The following fields do not exist in this model, are m2m "
390 "fields, or are non-concrete fields: {}".format(
391 ", ".join(non_model_fields)
392 )
393 )
394
395 # If this model is deferred, automatically do an "update_fields" save
396 # on the loaded fields.
397 elif not force_insert and deferred_fields:
398 field_names = set()
399 for field in self._model_meta.concrete_fields:
400 if not field.primary_key and not hasattr(field, "through"):
401 field_names.add(field.attname)
402 loaded_fields = field_names.difference(deferred_fields)
403 if loaded_fields:
404 update_fields = frozenset(loaded_fields)
405
406 if clean_and_validate:
407 self.full_clean(exclude=deferred_fields)
408
409 self.save_base(
410 force_insert=force_insert,
411 force_update=force_update,
412 update_fields=update_fields,
413 )
414
415 def save_base(
416 self,
417 *,
418 raw: bool = False,
419 force_insert: bool = False,
420 force_update: bool = False,
421 update_fields: Iterable[str] | None = None,
422 ) -> None:
423 """
424 Handle the parts of saving which should be done only once per save,
425 yet need to be done in raw saves, too. This includes some sanity
426 checks and signal sending.
427
428 The 'raw' argument is telling save_base not to save any parent
429 models and not to do any changes to the values before save. This
430 is used by fixture loading.
431 """
432 assert not (force_insert and (force_update or update_fields))
433 assert update_fields is None or update_fields
434 cls = self.__class__
435
436 with transaction.mark_for_rollback_on_error():
437 self._save_table(
438 raw=raw,
439 cls=cls,
440 force_insert=force_insert,
441 force_update=force_update,
442 update_fields=update_fields,
443 )
444 # Once saved, this is no longer a to-be-added instance.
445 self._state.adding = False
446
447 def _save_table(
448 self,
449 *,
450 raw: bool,
451 cls: type[Model],
452 force_insert: bool = False,
453 force_update: bool = False,
454 update_fields: Iterable[str] | None = None,
455 ) -> bool:
456 """
457 Do the heavy-lifting involved in saving. Update or insert the data
458 for a single table.
459 """
460 meta = cls._model_meta
461 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
462
463 if update_fields:
464 non_pks = [
465 f
466 for f in non_pks
467 if f.name in update_fields or f.attname in update_fields
468 ]
469
470 id_val = self.id
471 if id_val is None:
472 id_field = meta.get_forward_field("id")
473 id_val = id_field.get_id_value_on_save(self)
474 setattr(self, id_field.attname, id_val)
475 id_set = id_val is not None
476 if not id_set and (force_update or update_fields):
477 raise ValueError("Cannot force an update in save() with no primary key.")
478 updated = False
479 # Skip an UPDATE when adding an instance and primary key has a default.
480 if (
481 not raw
482 and not force_insert
483 and self._state.adding
484 and meta.get_forward_field("id").default
485 and meta.get_forward_field("id").default is not NOT_PROVIDED
486 ):
487 force_insert = True
488 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
489 if id_set and not force_insert:
490 base_qs = meta.base_queryset
491 values = [
492 (
493 f,
494 None,
495 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
496 )
497 for f in non_pks
498 ]
499 forced_update = bool(update_fields or force_update)
500 updated = self._do_update(
501 base_qs, id_val, values, update_fields, forced_update
502 )
503 if force_update and not updated:
504 raise DatabaseError("Forced update did not affect any rows.")
505 if update_fields and not updated:
506 raise DatabaseError("Save with update_fields did not affect any rows.")
507 if not updated:
508 fields = meta.local_concrete_fields
509 if not id_set:
510 id_field = meta.get_forward_field("id")
511 fields = [f for f in fields if f is not id_field]
512
513 returning_fields = meta.db_returning_fields
514 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
515 if results:
516 for value, field in zip(results[0], returning_fields):
517 setattr(self, field.attname, value)
518 return updated
519
520 def _do_update(
521 self,
522 base_qs: QuerySet,
523 id_val: Any,
524 values: list[tuple[Any, Any, Any]],
525 update_fields: Iterable[str] | None,
526 forced_update: bool,
527 ) -> bool:
528 """
529 Try to update the model. Return True if the model was updated (if an
530 update query was done and a matching row was found in the DB).
531 """
532 filtered = base_qs.filter(id=id_val)
533 if not values:
534 # We can end up here when saving a model in inheritance chain where
535 # update_fields doesn't target any field in current model. In that
536 # case we just say the update succeeded. Another case ending up here
537 # is a model with just PK - in that case check that the PK still
538 # exists.
539 return update_fields is not None or filtered.exists()
540 return filtered._update(values) > 0
541
542 def _do_insert(
543 self,
544 manager: QuerySet,
545 fields: Sequence[Any],
546 returning_fields: Sequence[Any],
547 raw: bool,
548 ) -> list[tuple[Any, ...]] | None:
549 """
550 Do an INSERT. If returning_fields is defined then this method should
551 return the newly created data for the model.
552 """
553 return manager._insert(
554 [self],
555 fields=list(fields),
556 returning_fields=list(returning_fields) if returning_fields else None,
557 raw=raw,
558 )
559
560 def _prepare_related_fields_for_save(
561 self, operation_name: str, fields: Sequence[Any] | None = None
562 ) -> None:
563 # Ensure that a model instance without a PK hasn't been assigned to
564 # a ForeignKeyField on this model. If the field is nullable, allowing the save would result in silent data loss.
565 for field in self._model_meta.concrete_fields:
566 if fields and field not in fields:
567 continue
568 # If the related field isn't cached, then an instance hasn't been
569 # assigned and there's no need to worry about this check.
570 if isinstance(field, RelatedField) and field.is_cached(self):
571 obj = getattr(self, field.name, None)
572 if not obj:
573 continue
574 # A pk may have been assigned manually to a model instance not
575 # saved to the database (or auto-generated in a case like
576 # UUIDField), but we allow the save to proceed and rely on the
577 # database to raise an IntegrityError if applicable. If
578 # constraints aren't supported by the database, there's the
579 # unavoidable risk of data corruption.
580 if obj.id is None:
581 # Remove the object from a related instance cache.
582 if not field.remote_field.multiple:
583 field.remote_field.delete_cached_value(obj)
584 raise ValueError(
585 f"{operation_name}() prohibited to prevent data loss due to unsaved "
586 f"related object '{field.name}'."
587 )
588 elif getattr(self, field.attname) in field.empty_values:
589 # Set related object if it has been saved after an
590 # assignment.
591 setattr(self, field.name, obj)
592 # If the relationship's pk/to_field was changed, clear the
593 # cached relationship.
594 if getattr(obj, field.target_field.attname) != getattr(
595 self, field.attname
596 ):
597 field.delete_cached_value(self)
598
599 def delete(self) -> tuple[int, dict[str, int]]:
600 if self.id is None:
601 raise ValueError(
602 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
603 "to None."
604 )
605 collector = Collector(origin=self)
606 collector.collect([self])
607 return collector.delete()
608
609 def get_field_display(self, field_name: str) -> str:
610 """Get the display value for a field, especially useful for fields with choices."""
611 # Get the field object from the field name
612 field = self._model_meta.get_forward_field(field_name)
613 value = getattr(self, field.attname)
614
615 # If field has no choices, just return the value as string
616 if not hasattr(field, "flatchoices") or not field.flatchoices:
617 return force_str(value, strings_only=True)
618
619 # For fields with choices, look up the display value
620 choices_dict = dict(make_hashable(field.flatchoices))
621 return force_str(
622 choices_dict.get(make_hashable(value), value), strings_only=True
623 )
624
625 def _get_field_value_map(
626 self, meta: Meta | None, exclude: set[str] | None = None
627 ) -> dict[str, Value]:
628 if exclude is None:
629 exclude = set()
630 meta = meta or self._model_meta
631 return {
632 field.name: Value(getattr(self, field.attname), field)
633 for field in meta.local_concrete_fields
634 if field.name not in exclude
635 }
636
637 def prepare_database_save(self, field: Any) -> Any:
638 if self.id is None:
639 raise ValueError(
640 f"Unsaved model instance {self!r} cannot be used in an ORM query."
641 )
642 return getattr(self, field.remote_field.get_related_field().attname)
643
644 def clean(self) -> None:
645 """
646 Hook for doing any extra model-wide validation after clean() has been
647 called on every field by self.clean_fields. Any ValidationError raised
648 by this method will not be associated with a particular field; it will
649 have a special-case association with the field defined by NON_FIELD_ERRORS.
650 """
651 pass
652
653 def validate_unique(self, exclude: set[str] | None = None) -> None:
654 """
655 Check unique constraints on the model and raise ValidationError if any
656 failed.
657 """
658 unique_checks = self._get_unique_checks(exclude=exclude)
659
660 if errors := self._perform_unique_checks(unique_checks):
661 raise ValidationError(errors)
662
663 def _get_unique_checks(
664 self, exclude: set[str] | None = None
665 ) -> list[tuple[type[Model], tuple[str, ...]]]:
666 """
667 Return a list of checks to perform. Since validate_unique() could be
668 called from a ModelForm, some fields may have been excluded; we can't
669 perform a unique check on a model that is missing fields involved
670 in that check. Fields that did not validate should also be excluded,
671 but they need to be passed in via the exclude argument.
672 """
673 if exclude is None:
674 exclude = set()
675 unique_checks = []
676
677 # Gather a list of checks for fields declared as unique and add them to
678 # the list of checks.
679
680 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
681
682 for model_class, fields in fields_with_class:
683 for f in fields:
684 name = f.name
685 if name in exclude:
686 continue
687 if f.primary_key:
688 unique_checks.append((model_class, (name,)))
689
690 return unique_checks
691
692 def _perform_unique_checks(
693 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
694 ) -> dict[str, list[ValidationError]]:
695 errors = {}
696
697 for model_class, unique_check in unique_checks:
698 # Try to look up an existing object with the same values as this
699 # object's values for all the unique field.
700
701 lookup_kwargs = {}
702 for field_name in unique_check:
703 f = self._model_meta.get_forward_field(field_name)
704 lookup_value = getattr(self, f.attname)
705 # TODO: Handle multiple backends with different feature flags.
706 if lookup_value is None:
707 # no value, skip the lookup
708 continue
709 if f.primary_key and not self._state.adding:
710 # no need to check for unique primary key when editing
711 continue
712 lookup_kwargs[str(field_name)] = lookup_value
713
714 # some fields were skipped, no reason to do the check
715 if len(unique_check) != len(lookup_kwargs):
716 continue
717
718 qs = model_class.query.filter(**lookup_kwargs)
719
720 # Exclude the current object from the query if we are editing an
721 # instance (as opposed to creating a new one)
722 # Use the primary key defined by model_class. In previous versions
723 # this could differ from `self.id` due to model inheritance.
724 model_class_id = getattr(self, "id")
725 if not self._state.adding and model_class_id is not None:
726 qs = qs.exclude(id=model_class_id)
727 if qs.exists():
728 if len(unique_check) == 1:
729 key = unique_check[0]
730 else:
731 key = NON_FIELD_ERRORS
732 errors.setdefault(key, []).append(
733 self.unique_error_message(model_class, unique_check)
734 )
735
736 return errors
737
738 def unique_error_message(
739 self, model_class: type[Model], unique_check: tuple[str, ...]
740 ) -> ValidationError:
741 meta = model_class._model_meta
742
743 params = {
744 "model": self,
745 "model_class": model_class,
746 "model_name": model_class.model_options.model_name,
747 "unique_check": unique_check,
748 }
749
750 if len(unique_check) == 1:
751 field = meta.get_forward_field(unique_check[0])
752 params["field_label"] = field.name
753 return ValidationError(
754 message=field.error_messages["unique"],
755 code="unique",
756 params=params,
757 )
758 else:
759 field_names = [meta.get_forward_field(f).name for f in unique_check]
760
761 # Put an "and" before the last one
762 field_names[-1] = f"and {field_names[-1]}"
763
764 if len(field_names) > 2:
765 # Comma join if more than 2
766 params["field_label"] = ", ".join(cast(list[str], field_names))
767 else:
768 # Just a space if there are only 2
769 params["field_label"] = " ".join(cast(list[str], field_names))
770
771 # Use the first field as the message format...
772 message = meta.get_forward_field(unique_check[0]).error_messages["unique"]
773
774 return ValidationError(
775 message=message,
776 code="unique",
777 params=params,
778 )
779
780 def get_constraints(self) -> list[tuple[type[Model], list[Any]]]:
781 constraints = [(self.__class__, list(self.model_options.constraints))]
782 return constraints
783
784 def validate_constraints(self, exclude: set[str] | None = None) -> None:
785 constraints = self.get_constraints()
786
787 errors = {}
788 for model_class, model_constraints in constraints:
789 for constraint in model_constraints:
790 try:
791 constraint.validate(model_class, self, exclude=exclude)
792 except ValidationError as e:
793 if (
794 getattr(e, "code", None) == "unique"
795 and len(constraint.fields) == 1
796 ):
797 errors.setdefault(constraint.fields[0], []).append(e)
798 else:
799 errors = e.update_error_dict(errors)
800 if errors:
801 raise ValidationError(errors)
802
803 def full_clean(
804 self,
805 *,
806 exclude: set[str] | Iterable[str] | None = None,
807 validate_unique: bool = True,
808 validate_constraints: bool = True,
809 ) -> None:
810 """
811 Call clean_fields(), clean(), validate_unique(), and
812 validate_constraints() on the model. Raise a ValidationError for any
813 errors that occur.
814 """
815 errors = {}
816 if exclude is None:
817 exclude = set()
818 else:
819 exclude = set(exclude)
820
821 try:
822 self.clean_fields(exclude=exclude)
823 except ValidationError as e:
824 errors = e.update_error_dict(errors)
825
826 # Form.clean() is run even if other validation fails, so do the
827 # same with Model.clean() for consistency.
828 try:
829 self.clean()
830 except ValidationError as e:
831 errors = e.update_error_dict(errors)
832
833 # Run unique checks, but only for fields that passed validation.
834 if validate_unique:
835 for name in errors:
836 if name != NON_FIELD_ERRORS and name not in exclude:
837 exclude.add(name)
838 try:
839 self.validate_unique(exclude=exclude)
840 except ValidationError as e:
841 errors = e.update_error_dict(errors)
842
843 # Run constraints checks, but only for fields that passed validation.
844 if validate_constraints:
845 for name in errors:
846 if name != NON_FIELD_ERRORS and name not in exclude:
847 exclude.add(name)
848 try:
849 self.validate_constraints(exclude=exclude)
850 except ValidationError as e:
851 errors = e.update_error_dict(errors)
852
853 if errors:
854 raise ValidationError(errors)
855
856 def clean_fields(self, exclude: set[str] | None = None) -> None:
857 """
858 Clean all fields and raise a ValidationError containing a dict
859 of all validation errors if any occur.
860 """
861 if exclude is None:
862 exclude = set()
863
864 errors = {}
865 for f in self._model_meta.fields:
866 if f.name in exclude:
867 continue
868 # Skip validation for empty fields with required=False. The developer
869 # is responsible for making sure they have a valid value.
870 raw_value = getattr(self, f.attname)
871 if not f.required and raw_value in f.empty_values:
872 continue
873 try:
874 setattr(self, f.attname, f.clean(raw_value, self))
875 except ValidationError as e:
876 errors[f.name] = e.error_list
877
878 if errors:
879 raise ValidationError(errors)
880
881 @classmethod
882 def preflight(cls) -> list[PreflightResult]:
883 errors: list[PreflightResult] = []
884
885 errors += [
886 *cls._check_fields(),
887 *cls._check_m2m_through_same_relationship(),
888 *cls._check_long_column_names(),
889 ]
890 clash_errors = (
891 *cls._check_id_field(),
892 *cls._check_field_name_clashes(),
893 *cls._check_model_name_db_lookup_clashes(),
894 *cls._check_property_name_related_field_accessor_clashes(),
895 *cls._check_single_primary_key(),
896 )
897 errors.extend(clash_errors)
898 # If there are field name clashes, hide consequent column name
899 # clashes.
900 if not clash_errors:
901 errors.extend(cls._check_column_name_clashes())
902 errors += [
903 *cls._check_indexes(),
904 *cls._check_ordering(),
905 *cls._check_constraints(),
906 ]
907
908 return errors
909
910 @classmethod
911 def _check_fields(cls) -> list[PreflightResult]:
912 """Perform all field checks."""
913 errors: list[PreflightResult] = []
914 for field in cls._model_meta.local_fields:
915 errors.extend(field.preflight(from_model=cls))
916 for field in cls._model_meta.local_many_to_many:
917 errors.extend(field.preflight(from_model=cls))
918 return errors
919
920 @classmethod
921 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
922 """Check if no relationship model is used by more than one m2m field."""
923
924 errors: list[PreflightResult] = []
925 seen_intermediary_signatures = []
926
927 fields = cls._model_meta.local_many_to_many
928
929 # Skip when the target model wasn't found.
930 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
931
932 # Skip when the relationship model wasn't found.
933 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
934
935 for f in fields:
936 signature = (
937 f.remote_field.model,
938 cls,
939 f.remote_field.through,
940 f.remote_field.through_fields,
941 )
942 if signature in seen_intermediary_signatures:
943 errors.append(
944 PreflightResult(
945 fix="The model has two identical many-to-many relations "
946 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
947 obj=cls,
948 id="models.duplicate_many_to_many_relations",
949 )
950 )
951 else:
952 seen_intermediary_signatures.append(signature)
953 return errors
954
955 @classmethod
956 def _check_id_field(cls) -> list[PreflightResult]:
957 """Disallow user-defined fields named ``id``."""
958 if any(
959 f
960 for f in cls._model_meta.local_fields
961 if f.name == "id" and not f.auto_created
962 ):
963 return [
964 PreflightResult(
965 fix="'id' is a reserved word that cannot be used as a field name.",
966 obj=cls,
967 id="models.reserved_field_name_id",
968 )
969 ]
970 return []
971
972 @classmethod
973 def _check_field_name_clashes(cls) -> list[PreflightResult]:
974 """Forbid field shadowing in multi-table inheritance."""
975 errors: list[PreflightResult] = []
976 used_fields = {} # name or attname -> field
977
978 for f in cls._model_meta.local_fields:
979 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
980 # Note that we may detect clash between user-defined non-unique
981 # field "id" and automatically added unique field "id", both
982 # defined at the same model. This special case is considered in
983 # _check_id_field and here we ignore it.
984 id_conflict = (
985 f.name == "id" and clash and clash.name == "id" and clash.model == cls
986 )
987 if clash and not id_conflict:
988 errors.append(
989 PreflightResult(
990 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
991 f"from model '{clash.model.model_options}'.",
992 obj=f,
993 id="models.field_name_clash",
994 )
995 )
996 used_fields[f.name] = f
997 used_fields[f.attname] = f
998
999 return errors
1000
1001 @classmethod
1002 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1003 # Store a list of column names which have already been used by other fields.
1004 used_column_names: list[str] = []
1005 errors: list[PreflightResult] = []
1006
1007 for f in cls._model_meta.local_fields:
1008 column_name = f.column
1009
1010 # Ensure the column name is not already in use.
1011 if column_name and column_name in used_column_names:
1012 errors.append(
1013 PreflightResult(
1014 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1015 "another field.",
1016 obj=cls,
1017 id="models.db_column_clash",
1018 )
1019 )
1020 else:
1021 used_column_names.append(column_name)
1022
1023 return errors
1024
1025 @classmethod
1026 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1027 errors: list[PreflightResult] = []
1028 model_name = cls.__name__
1029 if model_name.startswith("_") or model_name.endswith("_"):
1030 errors.append(
1031 PreflightResult(
1032 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1033 "as it collides with the query lookup syntax.",
1034 obj=cls,
1035 id="models.model_name_underscore_bounds",
1036 )
1037 )
1038 elif LOOKUP_SEP in model_name:
1039 errors.append(
1040 PreflightResult(
1041 fix=f"The model name '{model_name}' cannot contain double underscores as "
1042 "it collides with the query lookup syntax.",
1043 obj=cls,
1044 id="models.model_name_double_underscore",
1045 )
1046 )
1047 return errors
1048
1049 @classmethod
1050 def _check_property_name_related_field_accessor_clashes(
1051 cls,
1052 ) -> list[PreflightResult]:
1053 errors: list[PreflightResult] = []
1054 property_names = cls._model_meta._property_names
1055 related_field_accessors = (
1056 f.get_attname()
1057 for f in cls._model_meta._get_fields(reverse=False)
1058 if isinstance(f, RelatedField)
1059 )
1060 for accessor in related_field_accessors:
1061 if accessor in property_names:
1062 errors.append(
1063 PreflightResult(
1064 fix=f"The property '{accessor}' clashes with a related field "
1065 "accessor.",
1066 obj=cls,
1067 id="models.property_related_field_clash",
1068 )
1069 )
1070 return errors
1071
1072 @classmethod
1073 def _check_single_primary_key(cls) -> list[PreflightResult]:
1074 errors: list[PreflightResult] = []
1075 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1076 errors.append(
1077 PreflightResult(
1078 fix="The model cannot have more than one field with "
1079 "'primary_key=True'.",
1080 obj=cls,
1081 id="models.multiple_primary_keys",
1082 )
1083 )
1084 return errors
1085
1086 @classmethod
1087 def _check_indexes(cls) -> list[PreflightResult]:
1088 """Check fields, names, and conditions of indexes."""
1089 errors: list[PreflightResult] = []
1090 references: set[str] = set()
1091 for index in cls.model_options.indexes:
1092 # Index name can't start with an underscore or a number, restricted
1093 # for cross-database compatibility with Oracle.
1094 if index.name[0] == "_" or index.name[0].isdigit():
1095 errors.append(
1096 PreflightResult(
1097 fix=f"The index name '{index.name}' cannot start with an underscore "
1098 "or a number.",
1099 obj=cls,
1100 id="models.index_name_invalid_start",
1101 ),
1102 )
1103 if len(index.name) > index.max_name_length:
1104 errors.append(
1105 PreflightResult(
1106 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1107 "characters." % (index.name, index.max_name_length),
1108 obj=cls,
1109 id="models.index_name_too_long",
1110 ),
1111 )
1112 if index.contains_expressions:
1113 for expression in index.expressions:
1114 references.update(
1115 ref[0] for ref in cls._get_expr_references(expression)
1116 )
1117 if not (
1118 db_connection.features.supports_partial_indexes
1119 or "supports_partial_indexes" in cls.model_options.required_db_features
1120 ) and any(index.condition is not None for index in cls.model_options.indexes):
1121 errors.append(
1122 PreflightResult(
1123 fix=f"{db_connection.display_name} does not support indexes with conditions. "
1124 "Conditions will be ignored. Silence this warning "
1125 "if you don't care about it.",
1126 warning=True,
1127 obj=cls,
1128 id="models.index_conditions_ignored",
1129 )
1130 )
1131 if not (
1132 db_connection.features.supports_covering_indexes
1133 or "supports_covering_indexes" in cls.model_options.required_db_features
1134 ) and any(index.include for index in cls.model_options.indexes):
1135 errors.append(
1136 PreflightResult(
1137 fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1138 "Non-key columns will be ignored. Silence this "
1139 "warning if you don't care about it.",
1140 warning=True,
1141 obj=cls,
1142 id="models.index_non_key_columns_ignored",
1143 )
1144 )
1145 if not (
1146 db_connection.features.supports_expression_indexes
1147 or "supports_expression_indexes" in cls.model_options.required_db_features
1148 ) and any(index.contains_expressions for index in cls.model_options.indexes):
1149 errors.append(
1150 PreflightResult(
1151 fix=f"{db_connection.display_name} does not support indexes on expressions. "
1152 "An index won't be created. Silence this warning "
1153 "if you don't care about it.",
1154 warning=True,
1155 obj=cls,
1156 id="models.index_on_foreign_key",
1157 )
1158 )
1159 fields = [
1160 field
1161 for index in cls.model_options.indexes
1162 for field, _ in index.fields_orders
1163 ]
1164 fields += [
1165 include for index in cls.model_options.indexes for include in index.include
1166 ]
1167 fields += references
1168 errors.extend(cls._check_local_fields(fields, "indexes"))
1169 return errors
1170
1171 @classmethod
1172 def _check_local_fields(
1173 cls, fields: Iterable[str], option: str
1174 ) -> list[PreflightResult]:
1175 # In order to avoid hitting the relation tree prematurely, we use our
1176 # own fields_map instead of using get_field()
1177 forward_fields_map: dict[str, Field] = {}
1178 for field in cls._model_meta._get_fields(reverse=False):
1179 forward_fields_map[field.name] = field
1180 if hasattr(field, "attname"):
1181 forward_fields_map[field.attname] = field
1182
1183 errors: list[PreflightResult] = []
1184 for field_name in fields:
1185 try:
1186 field = forward_fields_map[field_name]
1187 except KeyError:
1188 errors.append(
1189 PreflightResult(
1190 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1191 obj=cls,
1192 id="models.nonexistent_field_reference",
1193 )
1194 )
1195 else:
1196 from plain.models.fields.related import ManyToManyField
1197
1198 if isinstance(field, ManyToManyField):
1199 errors.append(
1200 PreflightResult(
1201 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1202 f"ManyToManyFields are not permitted in '{option}'.",
1203 obj=cls,
1204 id="models.m2m_field_in_meta_option",
1205 )
1206 )
1207 elif field not in cls._model_meta.local_fields:
1208 errors.append(
1209 PreflightResult(
1210 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1211 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1212 obj=cls,
1213 id="models.non_local_field_reference",
1214 )
1215 )
1216 return errors
1217
1218 @classmethod
1219 def _check_ordering(cls) -> list[PreflightResult]:
1220 """
1221 Check "ordering" option -- is it a list of strings and do all fields
1222 exist?
1223 """
1224
1225 if not cls.model_options.ordering:
1226 return []
1227
1228 if not isinstance(cls.model_options.ordering, list | tuple):
1229 return [
1230 PreflightResult(
1231 fix="'ordering' must be a tuple or list (even if you want to order by "
1232 "only one field).",
1233 obj=cls,
1234 id="models.ordering_not_tuple_or_list",
1235 )
1236 ]
1237
1238 errors: list[PreflightResult] = []
1239 fields = cls.model_options.ordering
1240
1241 # Skip expressions and '?' fields.
1242 fields = (f for f in fields if isinstance(f, str) and f != "?")
1243
1244 # Convert "-field" to "field".
1245 fields = (f.removeprefix("-") for f in fields)
1246
1247 # Separate related fields and non-related fields.
1248 _fields = []
1249 related_fields = []
1250 for f in fields:
1251 if LOOKUP_SEP in f:
1252 related_fields.append(f)
1253 else:
1254 _fields.append(f)
1255 fields = _fields
1256
1257 # Check related fields.
1258 for field in related_fields:
1259 _cls = cls
1260 fld = None
1261 for part in field.split(LOOKUP_SEP):
1262 try:
1263 fld = _cls._model_meta.get_field(part)
1264 if isinstance(fld, RelatedField):
1265 _cls = fld.path_infos[-1].to_meta.model
1266 else:
1267 _cls = None
1268 except (FieldDoesNotExist, AttributeError):
1269 if fld is None or (
1270 not isinstance(fld, Field)
1271 or (
1272 fld.get_transform(part) is None
1273 and fld.get_lookup(part) is None
1274 )
1275 ):
1276 errors.append(
1277 PreflightResult(
1278 fix="'ordering' refers to the nonexistent field, "
1279 f"related field, or lookup '{field}'.",
1280 obj=cls,
1281 id="models.ordering_nonexistent_field",
1282 )
1283 )
1284
1285 # Check for invalid or nonexistent fields in ordering.
1286 invalid_fields = []
1287
1288 # Any field name that is not present in field_names does not exist.
1289 # Also, ordering by m2m fields is not allowed.
1290 meta = cls._model_meta
1291 valid_fields = set(
1292 chain.from_iterable(
1293 (f.name, f.attname)
1294 if not (f.auto_created and not f.concrete)
1295 else (f.field.related_query_name(),)
1296 for f in chain(meta.fields, meta.related_objects)
1297 )
1298 )
1299
1300 invalid_fields.extend(set(fields) - valid_fields)
1301
1302 for invalid_field in invalid_fields:
1303 errors.append(
1304 PreflightResult(
1305 fix="'ordering' refers to the nonexistent field, related "
1306 f"field, or lookup '{invalid_field}'.",
1307 obj=cls,
1308 id="models.ordering_nonexistent_field",
1309 )
1310 )
1311 return errors
1312
1313 @classmethod
1314 def _check_long_column_names(cls) -> list[PreflightResult]:
1315 """
1316 Check that any auto-generated column names are shorter than the limits
1317 for each database in which the model will be created.
1318 """
1319 errors: list[PreflightResult] = []
1320 allowed_len = None
1321
1322 max_name_length = db_connection.ops.max_name_length()
1323 if max_name_length is not None and not db_connection.features.truncates_names:
1324 allowed_len = max_name_length
1325
1326 if allowed_len is None:
1327 return errors
1328
1329 for f in cls._model_meta.local_fields:
1330 column_name = f.column
1331
1332 # Check if column name is too long for the database.
1333 if column_name is not None and len(column_name) > allowed_len:
1334 errors.append(
1335 PreflightResult(
1336 fix=f'Column name too long for field "{column_name}". '
1337 f'Maximum length is "{allowed_len}" for the database.',
1338 obj=cls,
1339 id="models.column_name_too_long",
1340 )
1341 )
1342
1343 for f in cls._model_meta.local_many_to_many:
1344 # Skip nonexistent models.
1345 if isinstance(f.remote_field.through, str):
1346 continue
1347
1348 # Check if column name for the M2M field is too long for the database.
1349 for m2m in f.remote_field.through._model_meta.local_fields:
1350 rel_name = m2m.column
1351 if rel_name is not None and len(rel_name) > allowed_len:
1352 errors.append(
1353 PreflightResult(
1354 fix="Column name too long for M2M field "
1355 f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1356 obj=cls,
1357 id="models.m2m_column_name_too_long",
1358 )
1359 )
1360
1361 return errors
1362
1363 @classmethod
1364 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1365 if isinstance(expr, Q):
1366 for child in expr.children:
1367 if isinstance(child, tuple):
1368 lookup, value = child
1369 yield tuple(lookup.split(LOOKUP_SEP))
1370 yield from cls._get_expr_references(value)
1371 else:
1372 yield from cls._get_expr_references(child)
1373 elif isinstance(expr, F):
1374 yield tuple(expr.name.split(LOOKUP_SEP))
1375 elif hasattr(expr, "get_source_expressions"):
1376 for src_expr in expr.get_source_expressions():
1377 yield from cls._get_expr_references(src_expr)
1378
1379 @classmethod
1380 def _check_constraints(cls) -> list[PreflightResult]:
1381 errors: list[PreflightResult] = []
1382 if not (
1383 db_connection.features.supports_table_check_constraints
1384 or "supports_table_check_constraints"
1385 in cls.model_options.required_db_features
1386 ) and any(
1387 isinstance(constraint, CheckConstraint)
1388 for constraint in cls.model_options.constraints
1389 ):
1390 errors.append(
1391 PreflightResult(
1392 fix=f"{db_connection.display_name} does not support check constraints. "
1393 "A constraint won't be created. Silence this "
1394 "warning if you don't care about it.",
1395 obj=cls,
1396 id="models.constraint_on_non_db_field",
1397 warning=True,
1398 )
1399 )
1400
1401 if not (
1402 db_connection.features.supports_partial_indexes
1403 or "supports_partial_indexes" in cls.model_options.required_db_features
1404 ) and any(
1405 isinstance(constraint, UniqueConstraint)
1406 and constraint.condition is not None
1407 for constraint in cls.model_options.constraints
1408 ):
1409 errors.append(
1410 PreflightResult(
1411 fix=f"{db_connection.display_name} does not support unique constraints with "
1412 "conditions. A constraint won't be created. Silence this "
1413 "warning if you don't care about it.",
1414 obj=cls,
1415 id="models.constraint_on_virtual_field",
1416 warning=True,
1417 )
1418 )
1419
1420 if not (
1421 db_connection.features.supports_deferrable_unique_constraints
1422 or "supports_deferrable_unique_constraints"
1423 in cls.model_options.required_db_features
1424 ) and any(
1425 isinstance(constraint, UniqueConstraint)
1426 and constraint.deferrable is not None
1427 for constraint in cls.model_options.constraints
1428 ):
1429 errors.append(
1430 PreflightResult(
1431 fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1432 "A constraint won't be created. Silence this "
1433 "warning if you don't care about it.",
1434 obj=cls,
1435 id="models.constraint_on_foreign_key",
1436 warning=True,
1437 )
1438 )
1439
1440 if not (
1441 db_connection.features.supports_covering_indexes
1442 or "supports_covering_indexes" in cls.model_options.required_db_features
1443 ) and any(
1444 isinstance(constraint, UniqueConstraint) and constraint.include
1445 for constraint in cls.model_options.constraints
1446 ):
1447 errors.append(
1448 PreflightResult(
1449 fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1450 "columns. A constraint won't be created. Silence this "
1451 "warning if you don't care about it.",
1452 obj=cls,
1453 id="models.constraint_on_m2m_field",
1454 warning=True,
1455 )
1456 )
1457
1458 if not (
1459 db_connection.features.supports_expression_indexes
1460 or "supports_expression_indexes" in cls.model_options.required_db_features
1461 ) and any(
1462 isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1463 for constraint in cls.model_options.constraints
1464 ):
1465 errors.append(
1466 PreflightResult(
1467 fix=f"{db_connection.display_name} does not support unique constraints on "
1468 "expressions. A constraint won't be created. Silence this "
1469 "warning if you don't care about it.",
1470 obj=cls,
1471 id="models.constraint_on_self_referencing_fk",
1472 warning=True,
1473 )
1474 )
1475 fields = set(
1476 chain.from_iterable(
1477 (*constraint.fields, *constraint.include)
1478 for constraint in cls.model_options.constraints
1479 if isinstance(constraint, UniqueConstraint)
1480 )
1481 )
1482 references = set()
1483 for constraint in cls.model_options.constraints:
1484 if isinstance(constraint, UniqueConstraint):
1485 if (
1486 db_connection.features.supports_partial_indexes
1487 or "supports_partial_indexes"
1488 not in cls.model_options.required_db_features
1489 ) and isinstance(constraint.condition, Q):
1490 references.update(cls._get_expr_references(constraint.condition))
1491 if (
1492 db_connection.features.supports_expression_indexes
1493 or "supports_expression_indexes"
1494 not in cls.model_options.required_db_features
1495 ) and constraint.contains_expressions:
1496 for expression in constraint.expressions:
1497 references.update(cls._get_expr_references(expression))
1498 elif isinstance(constraint, CheckConstraint):
1499 if (
1500 db_connection.features.supports_table_check_constraints
1501 or "supports_table_check_constraints"
1502 not in cls.model_options.required_db_features
1503 ):
1504 if isinstance(constraint.check, Q):
1505 references.update(cls._get_expr_references(constraint.check))
1506 if any(
1507 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1508 ):
1509 errors.append(
1510 PreflightResult(
1511 fix=f"Check constraint {constraint.name!r} contains "
1512 f"RawSQL() expression and won't be validated "
1513 f"during the model full_clean(). "
1514 "Silence this warning if you don't care about it.",
1515 warning=True,
1516 obj=cls,
1517 id="models.constraint_name_collision_autogenerated",
1518 ),
1519 )
1520 for field_name, *lookups in references:
1521 fields.add(field_name)
1522 if not lookups:
1523 # If it has no lookups it cannot result in a JOIN.
1524 continue
1525 try:
1526 field = cls._model_meta.get_field(field_name)
1527 from plain.models.fields.related import ManyToManyField
1528 from plain.models.fields.reverse_related import ForeignKeyRel
1529
1530 if (
1531 not isinstance(field, RelatedField)
1532 or isinstance(field, ManyToManyField)
1533 or isinstance(field, ForeignKeyRel)
1534 ):
1535 continue
1536 except FieldDoesNotExist:
1537 continue
1538 # JOIN must happen at the first lookup.
1539 first_lookup = lookups[0]
1540 if (
1541 hasattr(field, "get_transform")
1542 and hasattr(field, "get_lookup")
1543 and field.get_transform(first_lookup) is None
1544 and field.get_lookup(first_lookup) is None
1545 ):
1546 errors.append(
1547 PreflightResult(
1548 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1549 obj=cls,
1550 id="models.constraint_refers_to_joined_field",
1551 )
1552 )
1553 errors.extend(cls._check_local_fields(fields, "constraints"))
1554 return errors
1555
1556
1557########
1558# MISC #
1559########
1560
1561
1562def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1563 """Used to unpickle Model subclasses with deferred fields."""
1564 if isinstance(model_id, tuple):
1565 model = models_registry.get_model(*model_id)
1566 else:
1567 # Backwards compat - the model was cached directly in earlier versions.
1568 model = model_id
1569 return model.__new__(model)
1570
1571
1572# Pickle protocol marker - functions don't normally have this attribute
1573model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]