1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, cast
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction, types
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.exceptions import (
25 DoesNotExistDescriptor,
26 FieldDoesNotExist,
27 MultipleObjectsReturnedDescriptor,
28)
29from plain.models.expressions import RawSQL, Value
30from plain.models.fields import NOT_PROVIDED, Field
31from plain.models.fields.related import RelatedField
32from plain.models.fields.reverse_related import ForeignObjectRel
33from plain.models.meta import Meta
34from plain.models.options import Options
35from plain.models.query import F, Q, QuerySet
36from plain.preflight import PreflightResult
37from plain.utils.encoding import force_str
38from plain.utils.hashable import make_hashable
39
40
41class Deferred:
42 def __repr__(self) -> str:
43 return "<Deferred field>"
44
45 def __str__(self) -> str:
46 return "<Deferred field>"
47
48
49DEFERRED = Deferred()
50
51
52class ModelBase(type):
53 """Metaclass for all models."""
54
55 def __new__(
56 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
57 ) -> type:
58 # Don't do any of this for the root models.Model class.
59 if not bases:
60 return super().__new__(cls, name, bases, attrs)
61
62 for base in bases:
63 # Models are required to directly inherit from model.Model, not a subclass of it.
64 if issubclass(base, Model) and base is not Model:
65 raise TypeError(
66 f"A model can't extend another model: {name} extends {base}"
67 )
68
69 return super().__new__(cls, name, bases, attrs, **kwargs)
70
71
72class ModelStateFieldsCacheDescriptor:
73 def __get__(
74 self, instance: ModelState | None, cls: type | None = None
75 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
76 if instance is None:
77 return self
78 res = instance.fields_cache = {}
79 return res
80
81
82class ModelState:
83 """Store model instance state."""
84
85 # If true, uniqueness validation checks will consider this a new, unsaved
86 # object. Necessary for correct validation of new instances of objects with
87 # explicit (non-auto) PKs. This impacts validation only; it has no effect
88 # on the actual save.
89 adding = True
90 fields_cache = ModelStateFieldsCacheDescriptor()
91
92
93class Model(metaclass=ModelBase):
94 # Every model gets an automatic id field
95 id: int = types.PrimaryKeyField()
96
97 # Descriptors for other model behavior
98 query: QuerySet[Model] = QuerySet()
99 model_options: Options = Options()
100 _model_meta: Meta = Meta()
101 DoesNotExist = DoesNotExistDescriptor()
102 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
103
104 def __init__(self, **kwargs: Any):
105 # Alias some things as locals to avoid repeat global lookups
106 cls = self.__class__
107 meta = cls._model_meta
108 _setattr = setattr
109 _DEFERRED = DEFERRED
110
111 # Set up the storage for instance state
112 self._state = ModelState()
113
114 # Process all fields from kwargs or use defaults
115 for field in meta.fields:
116 from plain.models.fields.related import RelatedField
117
118 is_related_object = False
119 # Virtual field
120 if field.attname not in kwargs and field.column is None:
121 continue
122 if isinstance(field, RelatedField) and isinstance(
123 field.remote_field, ForeignObjectRel
124 ):
125 try:
126 # Assume object instance was passed in.
127 rel_obj = kwargs.pop(field.name)
128 is_related_object = True
129 except KeyError:
130 try:
131 # Object instance wasn't passed in -- must be an ID.
132 val = kwargs.pop(field.attname)
133 except KeyError:
134 val = field.get_default()
135 else:
136 try:
137 val = kwargs.pop(field.attname)
138 except KeyError:
139 # This is done with an exception rather than the
140 # default argument on pop because we don't want
141 # get_default() to be evaluated, and then not used.
142 # Refs #12057.
143 val = field.get_default()
144
145 if is_related_object:
146 # If we are passed a related instance, set it using the
147 # field.name instead of field.attname (e.g. "user" instead of
148 # "user_id") so that the object gets properly cached (and type
149 # checked) by the RelatedObjectDescriptor.
150 if rel_obj is not _DEFERRED:
151 _setattr(self, field.name, rel_obj)
152 else:
153 if val is not _DEFERRED:
154 _setattr(self, field.attname, val)
155
156 # Handle any remaining kwargs (properties or virtual fields)
157 property_names = meta._property_names
158 unexpected = ()
159 for prop, value in kwargs.items():
160 # Any remaining kwargs must correspond to properties or virtual
161 # fields.
162 if prop in property_names:
163 if value is not _DEFERRED:
164 _setattr(self, prop, value)
165 else:
166 try:
167 meta.get_field(prop)
168 except FieldDoesNotExist:
169 unexpected += (prop,)
170 else:
171 if value is not _DEFERRED:
172 _setattr(self, prop, value)
173 if unexpected:
174 unexpected_names = ", ".join(repr(n) for n in unexpected)
175 raise TypeError(
176 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
177 )
178
179 super().__init__()
180
181 @classmethod
182 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
183 if len(values) != len(cls._model_meta.concrete_fields):
184 values_iter = iter(values)
185 values = [
186 next(values_iter) if f.attname in field_names else DEFERRED
187 for f in cls._model_meta.concrete_fields
188 ]
189 # Build kwargs dict from field names and values
190 field_dict = dict(
191 zip((f.attname for f in cls._model_meta.concrete_fields), values)
192 )
193 new = cls(**field_dict)
194 new._state.adding = False
195 return new
196
197 def __repr__(self) -> str:
198 return f"<{self.__class__.__name__}: {self}>"
199
200 def __str__(self) -> str:
201 return f"{self.__class__.__name__} object ({self.id})"
202
203 def __eq__(self, other: object) -> bool:
204 if not isinstance(other, Model):
205 return NotImplemented
206 if self.__class__ != other.__class__:
207 return False
208 my_id = self.id
209 if my_id is None:
210 return self is other
211 return my_id == other.id
212
213 def __hash__(self) -> int:
214 if self.id is None:
215 raise TypeError("Model instances without primary key value are unhashable")
216 return hash(self.id)
217
218 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
219 data = self.__getstate__()
220 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
221 class_id = (
222 self.model_options.package_label,
223 self.model_options.object_name,
224 )
225 return model_unpickle, (class_id,), data
226
227 def __getstate__(self) -> dict[str, Any]:
228 """Hook to allow choosing the attributes to pickle."""
229 state = self.__dict__.copy()
230 state["_state"] = copy.copy(state["_state"])
231 state["_state"].fields_cache = state["_state"].fields_cache.copy()
232 # memoryview cannot be pickled, so cast it to bytes and store
233 # separately.
234 _memoryview_attrs = []
235 for attr, value in state.items():
236 if isinstance(value, memoryview):
237 _memoryview_attrs.append((attr, bytes(value)))
238 if _memoryview_attrs:
239 state["_memoryview_attrs"] = _memoryview_attrs
240 for attr, value in _memoryview_attrs:
241 state.pop(attr)
242 return state
243
244 def __setstate__(self, state: dict[str, Any]) -> None:
245 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
246 if pickled_version:
247 if pickled_version != plain.runtime.__version__:
248 warnings.warn(
249 f"Pickled model instance's Plain version {pickled_version} does not "
250 f"match the current version {plain.runtime.__version__}.",
251 RuntimeWarning,
252 stacklevel=2,
253 )
254 else:
255 warnings.warn(
256 "Pickled model instance's Plain version is not specified.",
257 RuntimeWarning,
258 stacklevel=2,
259 )
260 if "_memoryview_attrs" in state:
261 for attr, value in state.pop("_memoryview_attrs"):
262 state[attr] = memoryview(value)
263 self.__dict__.update(state)
264
265 def get_deferred_fields(self) -> set[str]:
266 """
267 Return a set containing names of deferred fields for this instance.
268 """
269 return {
270 f.attname
271 for f in self._model_meta.concrete_fields
272 if f.attname not in self.__dict__
273 }
274
275 def refresh_from_db(self, fields: list[str] | None = None) -> None:
276 """
277 Reload field values from the database.
278
279 By default, the reloading happens from the database this instance was
280 loaded from, or by the read router if this instance wasn't loaded from
281 any database. The using parameter will override the default.
282
283 Fields can be used to specify which fields to reload. The fields
284 should be an iterable of field attnames. If fields is None, then
285 all non-deferred fields are reloaded.
286
287 When accessing deferred fields of an instance, the deferred loading
288 of the field will call this method.
289 """
290 if fields is None:
291 self._prefetched_objects_cache = {}
292 else:
293 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", {})
294 for field in fields:
295 if field in prefetched_objects_cache:
296 del prefetched_objects_cache[field]
297 fields.remove(field)
298 if not fields:
299 return
300 if any(LOOKUP_SEP in f for f in fields):
301 raise ValueError(
302 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
303 "are not allowed in fields."
304 )
305
306 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
307
308 # Use provided fields, if not set then reload all non-deferred fields.
309 deferred_fields = self.get_deferred_fields()
310 if fields is not None:
311 fields = list(fields)
312 db_instance_qs = db_instance_qs.only(*fields)
313 elif deferred_fields:
314 fields = [
315 f.attname
316 for f in self._model_meta.concrete_fields
317 if f.attname not in deferred_fields
318 ]
319 db_instance_qs = db_instance_qs.only(*fields)
320
321 db_instance = db_instance_qs.get()
322 non_loaded_fields = db_instance.get_deferred_fields()
323 for field in self._model_meta.concrete_fields:
324 if field.attname in non_loaded_fields:
325 # This field wasn't refreshed - skip ahead.
326 continue
327 setattr(self, field.attname, getattr(db_instance, field.attname))
328 # Clear cached foreign keys.
329 if isinstance(field, RelatedField) and field.is_cached(self):
330 field.delete_cached_value(self)
331
332 # Clear cached relations.
333 for field in self._model_meta.related_objects:
334 if field.is_cached(self):
335 field.delete_cached_value(self)
336
337 def serializable_value(self, field_name: str) -> Any:
338 """
339 Return the value of the field name for this instance. If the field is
340 a foreign key, return the id value instead of the object. If there's
341 no Field object with this name on the model, return the model
342 attribute's value.
343
344 Used to serialize a field's value (in the serializer, or form output,
345 for example). Normally, you would just access the attribute directly
346 and not use this method.
347 """
348 try:
349 field = self._model_meta.get_forward_field(field_name)
350 except FieldDoesNotExist:
351 return getattr(self, field_name)
352 return getattr(self, field.attname)
353
354 def save(
355 self,
356 *,
357 clean_and_validate: bool = True,
358 force_insert: bool = False,
359 force_update: bool = False,
360 update_fields: Iterable[str] | None = None,
361 ) -> None:
362 """
363 Save the current instance. Override this in a subclass if you want to
364 control the saving process.
365
366 The 'force_insert' and 'force_update' parameters can be used to insist
367 that the "save" must be an SQL insert or update (or equivalent for
368 non-SQL backends), respectively. Normally, they should not be set.
369 """
370 self._prepare_related_fields_for_save(operation_name="save")
371
372 if force_insert and (force_update or update_fields):
373 raise ValueError("Cannot force both insert and updating in model saving.")
374
375 deferred_fields = self.get_deferred_fields()
376 if update_fields is not None:
377 # If update_fields is empty, skip the save. We do also check for
378 # no-op saves later on for inheritance cases. This bailout is
379 # still needed for skipping signal sending.
380 if not update_fields:
381 return
382
383 update_fields = frozenset(update_fields)
384 field_names = self._model_meta._non_pk_concrete_field_names
385 non_model_fields = update_fields.difference(field_names)
386
387 if non_model_fields:
388 raise ValueError(
389 "The following fields do not exist in this model, are m2m "
390 "fields, or are non-concrete fields: {}".format(
391 ", ".join(non_model_fields)
392 )
393 )
394
395 # If this model is deferred, automatically do an "update_fields" save
396 # on the loaded fields.
397 elif not force_insert and deferred_fields:
398 field_names = set()
399 for field in self._model_meta.concrete_fields:
400 if not field.primary_key and not hasattr(field, "through"):
401 field_names.add(field.attname)
402 loaded_fields = field_names.difference(deferred_fields)
403 if loaded_fields:
404 update_fields = frozenset(loaded_fields)
405
406 if clean_and_validate:
407 self.full_clean(exclude=deferred_fields)
408
409 self.save_base(
410 force_insert=force_insert,
411 force_update=force_update,
412 update_fields=update_fields,
413 )
414
415 def save_base(
416 self,
417 *,
418 raw: bool = False,
419 force_insert: bool = False,
420 force_update: bool = False,
421 update_fields: Iterable[str] | None = None,
422 ) -> None:
423 """
424 Handle the parts of saving which should be done only once per save,
425 yet need to be done in raw saves, too. This includes some sanity
426 checks and signal sending.
427
428 The 'raw' argument is telling save_base not to save any parent
429 models and not to do any changes to the values before save. This
430 is used by fixture loading.
431 """
432 assert not (force_insert and (force_update or update_fields))
433 assert update_fields is None or update_fields
434 cls = self.__class__
435
436 with transaction.mark_for_rollback_on_error():
437 self._save_table(
438 raw=raw,
439 cls=cls,
440 force_insert=force_insert,
441 force_update=force_update,
442 update_fields=update_fields,
443 )
444 # Once saved, this is no longer a to-be-added instance.
445 self._state.adding = False
446
447 def _save_table(
448 self,
449 *,
450 raw: bool,
451 cls: type[Model],
452 force_insert: bool = False,
453 force_update: bool = False,
454 update_fields: Iterable[str] | None = None,
455 ) -> bool:
456 """
457 Do the heavy-lifting involved in saving. Update or insert the data
458 for a single table.
459 """
460 meta = cls._model_meta
461 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
462
463 if update_fields:
464 non_pks = [
465 f
466 for f in non_pks
467 if f.name in update_fields or f.attname in update_fields
468 ]
469
470 id_val = self.id
471 if id_val is None:
472 id_field = meta.get_forward_field("id")
473 id_val = id_field.get_id_value_on_save(self)
474 setattr(self, id_field.attname, id_val)
475 id_set = id_val is not None
476 if not id_set and (force_update or update_fields):
477 raise ValueError("Cannot force an update in save() with no primary key.")
478 updated = False
479 # Skip an UPDATE when adding an instance and primary key has a default.
480 if (
481 not raw
482 and not force_insert
483 and self._state.adding
484 and meta.get_forward_field("id").default
485 and meta.get_forward_field("id").default is not NOT_PROVIDED
486 ):
487 force_insert = True
488 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
489 if id_set and not force_insert:
490 base_qs = meta.base_queryset
491 values = [
492 (
493 f,
494 None,
495 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
496 )
497 for f in non_pks
498 ]
499 forced_update = bool(update_fields or force_update)
500 updated = self._do_update(
501 base_qs, id_val, values, update_fields, forced_update
502 )
503 if force_update and not updated:
504 raise DatabaseError("Forced update did not affect any rows.")
505 if update_fields and not updated:
506 raise DatabaseError("Save with update_fields did not affect any rows.")
507 if not updated:
508 fields = meta.local_concrete_fields
509 if not id_set:
510 id_field = meta.get_forward_field("id")
511 fields = [f for f in fields if f is not id_field]
512
513 returning_fields = meta.db_returning_fields
514 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
515 if results:
516 for value, field in zip(results[0], returning_fields):
517 setattr(self, field.attname, value)
518 return updated
519
520 def _do_update(
521 self,
522 base_qs: QuerySet,
523 id_val: Any,
524 values: list[tuple[Any, Any, Any]],
525 update_fields: Iterable[str] | None,
526 forced_update: bool,
527 ) -> bool:
528 """
529 Try to update the model. Return True if the model was updated (if an
530 update query was done and a matching row was found in the DB).
531 """
532 filtered = base_qs.filter(id=id_val)
533 if not values:
534 # We can end up here when saving a model in inheritance chain where
535 # update_fields doesn't target any field in current model. In that
536 # case we just say the update succeeded. Another case ending up here
537 # is a model with just PK - in that case check that the PK still
538 # exists.
539 return update_fields is not None or filtered.exists()
540 return filtered._update(values) > 0
541
542 def _do_insert(
543 self,
544 manager: QuerySet,
545 fields: Sequence[Any],
546 returning_fields: Sequence[Any],
547 raw: bool,
548 ) -> list[tuple[Any, ...]] | None:
549 """
550 Do an INSERT. If returning_fields is defined then this method should
551 return the newly created data for the model.
552 """
553 return manager._insert(
554 [self],
555 fields=list(fields),
556 returning_fields=list(returning_fields) if returning_fields else None,
557 raw=raw,
558 )
559
560 def _prepare_related_fields_for_save(
561 self, operation_name: str, fields: Sequence[Any] | None = None
562 ) -> None:
563 # Ensure that a model instance without a PK hasn't been assigned to
564 # a ForeignKeyField on this model. If the field is nullable, allowing the save would result in silent data loss.
565 for field in self._model_meta.concrete_fields:
566 if fields and field not in fields:
567 continue
568 # If the related field isn't cached, then an instance hasn't been
569 # assigned and there's no need to worry about this check.
570 if isinstance(field, RelatedField) and field.is_cached(self):
571 obj = getattr(self, field.name, None)
572 if not obj:
573 continue
574 # A pk may have been assigned manually to a model instance not
575 # saved to the database (or auto-generated in a case like
576 # UUIDField), but we allow the save to proceed and rely on the
577 # database to raise an IntegrityError if applicable. If
578 # constraints aren't supported by the database, there's the
579 # unavoidable risk of data corruption.
580 if obj.id is None:
581 # Remove the object from a related instance cache.
582 if not field.remote_field.multiple:
583 field.remote_field.delete_cached_value(obj)
584 raise ValueError(
585 f"{operation_name}() prohibited to prevent data loss due to unsaved "
586 f"related object '{field.name}'."
587 )
588 elif getattr(self, field.attname) in field.empty_values:
589 # Set related object if it has been saved after an
590 # assignment.
591 setattr(self, field.name, obj)
592 # If the relationship's pk/to_field was changed, clear the
593 # cached relationship.
594 if getattr(obj, field.target_field.attname) != getattr(
595 self, field.attname
596 ):
597 field.delete_cached_value(self)
598
599 def delete(self) -> tuple[int, dict[str, int]]:
600 if self.id is None:
601 raise ValueError(
602 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
603 "to None."
604 )
605 collector = Collector(origin=self)
606 collector.collect([self])
607 return collector.delete()
608
609 def get_field_display(self, field_name: str) -> str:
610 """Get the display value for a field, especially useful for fields with choices."""
611 # Get the field object from the field name
612 field = self._model_meta.get_forward_field(field_name)
613 value = getattr(self, field.attname)
614
615 # If field has no choices, just return the value as string
616 if not hasattr(field, "flatchoices") or not field.flatchoices:
617 return force_str(value, strings_only=True)
618
619 # For fields with choices, look up the display value
620 choices_dict = dict(make_hashable(field.flatchoices))
621 return force_str(
622 choices_dict.get(make_hashable(value), value), strings_only=True
623 )
624
625 def _get_field_value_map(
626 self, meta: Meta | None, exclude: set[str] | None = None
627 ) -> dict[str, Value]:
628 if exclude is None:
629 exclude = set()
630 meta = meta or self._model_meta
631 return {
632 field.name: Value(getattr(self, field.attname), field)
633 for field in meta.local_concrete_fields
634 if field.name not in exclude
635 }
636
637 def prepare_database_save(self, field: Any) -> Any:
638 if self.id is None:
639 raise ValueError(
640 f"Unsaved model instance {self!r} cannot be used in an ORM query."
641 )
642 return getattr(self, field.remote_field.get_related_field().attname)
643
644 def clean(self) -> None:
645 """
646 Hook for doing any extra model-wide validation after clean() has been
647 called on every field by self.clean_fields. Any ValidationError raised
648 by this method will not be associated with a particular field; it will
649 have a special-case association with the field defined by NON_FIELD_ERRORS.
650 """
651 pass
652
653 def validate_unique(self, exclude: set[str] | None = None) -> None:
654 """
655 Check unique constraints on the model and raise ValidationError if any
656 failed.
657 """
658 unique_checks = self._get_unique_checks(exclude=exclude)
659
660 if errors := self._perform_unique_checks(unique_checks):
661 raise ValidationError(errors)
662
663 def _get_unique_checks(
664 self, exclude: set[str] | None = None
665 ) -> list[tuple[type[Model], tuple[str, ...]]]:
666 """
667 Return a list of checks to perform. Since validate_unique() could be
668 called from a ModelForm, some fields may have been excluded; we can't
669 perform a unique check on a model that is missing fields involved
670 in that check. Fields that did not validate should also be excluded,
671 but they need to be passed in via the exclude argument.
672 """
673 if exclude is None:
674 exclude = set()
675 unique_checks = []
676
677 # Gather a list of checks for fields declared as unique and add them to
678 # the list of checks.
679
680 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
681
682 for model_class, fields in fields_with_class:
683 for f in fields:
684 name = f.name
685 if name in exclude:
686 continue
687 if f.primary_key:
688 unique_checks.append((model_class, (name,)))
689
690 return unique_checks
691
692 def _perform_unique_checks(
693 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
694 ) -> dict[str, list[ValidationError]]:
695 errors = {}
696
697 for model_class, unique_check in unique_checks:
698 # Try to look up an existing object with the same values as this
699 # object's values for all the unique field.
700
701 lookup_kwargs = {}
702 for field_name in unique_check:
703 f = self._model_meta.get_forward_field(field_name)
704 lookup_value = getattr(self, f.attname)
705 # TODO: Handle multiple backends with different feature flags.
706 if lookup_value is None:
707 # no value, skip the lookup
708 continue
709 if f.primary_key and not self._state.adding:
710 # no need to check for unique primary key when editing
711 continue
712 lookup_kwargs[str(field_name)] = lookup_value
713
714 # some fields were skipped, no reason to do the check
715 if len(unique_check) != len(lookup_kwargs):
716 continue
717
718 qs = model_class.query.filter(**lookup_kwargs)
719
720 # Exclude the current object from the query if we are editing an
721 # instance (as opposed to creating a new one)
722 # Use the primary key defined by model_class. In previous versions
723 # this could differ from `self.id` due to model inheritance.
724 model_class_id = getattr(self, "id")
725 if not self._state.adding and model_class_id is not None:
726 qs = qs.exclude(id=model_class_id)
727 if qs.exists():
728 if len(unique_check) == 1:
729 key = unique_check[0]
730 else:
731 key = NON_FIELD_ERRORS
732 errors.setdefault(key, []).append(
733 self.unique_error_message(model_class, unique_check)
734 )
735
736 return errors
737
738 def unique_error_message(
739 self, model_class: type[Model], unique_check: tuple[str, ...]
740 ) -> ValidationError:
741 meta = model_class._model_meta
742
743 params = {
744 "model": self,
745 "model_class": model_class,
746 "model_name": model_class.model_options.model_name,
747 "unique_check": unique_check,
748 }
749
750 if len(unique_check) == 1:
751 field = meta.get_forward_field(unique_check[0])
752 params["field_label"] = field.name
753 return ValidationError(
754 message=field.error_messages["unique"],
755 code="unique",
756 params=params,
757 )
758 else:
759 field_names = [meta.get_forward_field(f).name for f in unique_check]
760
761 # Put an "and" before the last one
762 field_names[-1] = f"and {field_names[-1]}"
763
764 if len(field_names) > 2:
765 # Comma join if more than 2
766 params["field_label"] = ", ".join(cast(list[str], field_names))
767 else:
768 # Just a space if there are only 2
769 params["field_label"] = " ".join(cast(list[str], field_names))
770
771 # Use the first field as the message format...
772 message = meta.get_forward_field(unique_check[0]).error_messages["unique"]
773
774 return ValidationError(
775 message=message,
776 code="unique",
777 params=params,
778 )
779
780 def get_constraints(self) -> list[tuple[type[Model], list[Any]]]:
781 constraints = [(self.__class__, list(self.model_options.constraints))]
782 return constraints
783
784 def validate_constraints(self, exclude: set[str] | None = None) -> None:
785 constraints = self.get_constraints()
786
787 errors = {}
788 for model_class, model_constraints in constraints:
789 for constraint in model_constraints:
790 try:
791 constraint.validate(model_class, self, exclude=exclude)
792 except ValidationError as e:
793 if (
794 getattr(e, "code", None) == "unique"
795 and len(constraint.fields) == 1
796 ):
797 errors.setdefault(constraint.fields[0], []).append(e)
798 else:
799 errors = e.update_error_dict(errors)
800 if errors:
801 raise ValidationError(errors)
802
803 def full_clean(
804 self,
805 *,
806 exclude: set[str] | Iterable[str] | None = None,
807 validate_unique: bool = True,
808 validate_constraints: bool = True,
809 ) -> None:
810 """
811 Call clean_fields(), clean(), validate_unique(), and
812 validate_constraints() on the model. Raise a ValidationError for any
813 errors that occur.
814 """
815 errors = {}
816 if exclude is None:
817 exclude = set()
818 else:
819 exclude = set(exclude)
820
821 try:
822 self.clean_fields(exclude=exclude)
823 except ValidationError as e:
824 errors = e.update_error_dict(errors)
825
826 # Form.clean() is run even if other validation fails, so do the
827 # same with Model.clean() for consistency.
828 try:
829 self.clean()
830 except ValidationError as e:
831 errors = e.update_error_dict(errors)
832
833 # Run unique checks, but only for fields that passed validation.
834 if validate_unique:
835 for name in errors:
836 if name != NON_FIELD_ERRORS and name not in exclude:
837 exclude.add(name)
838 try:
839 self.validate_unique(exclude=exclude)
840 except ValidationError as e:
841 errors = e.update_error_dict(errors)
842
843 # Run constraints checks, but only for fields that passed validation.
844 if validate_constraints:
845 for name in errors:
846 if name != NON_FIELD_ERRORS and name not in exclude:
847 exclude.add(name)
848 try:
849 self.validate_constraints(exclude=exclude)
850 except ValidationError as e:
851 errors = e.update_error_dict(errors)
852
853 if errors:
854 raise ValidationError(errors)
855
856 def clean_fields(self, exclude: set[str] | None = None) -> None:
857 """
858 Clean all fields and raise a ValidationError containing a dict
859 of all validation errors if any occur.
860 """
861 if exclude is None:
862 exclude = set()
863
864 errors = {}
865 for f in self._model_meta.fields:
866 if f.name in exclude:
867 continue
868 # Skip validation for empty fields with required=False. The developer
869 # is responsible for making sure they have a valid value.
870 raw_value = getattr(self, f.attname)
871 if not f.required and raw_value in f.empty_values:
872 continue
873 try:
874 setattr(self, f.attname, f.clean(raw_value, self))
875 except ValidationError as e:
876 errors[f.name] = e.error_list
877
878 if errors:
879 raise ValidationError(errors)
880
881 @classmethod
882 def preflight(cls) -> list[PreflightResult]:
883 errors: list[PreflightResult] = []
884
885 errors += [
886 *cls._check_fields(),
887 *cls._check_m2m_through_same_relationship(),
888 *cls._check_long_column_names(),
889 ]
890 clash_errors = (
891 *cls._check_id_field(),
892 *cls._check_field_name_clashes(),
893 *cls._check_model_name_db_lookup_clashes(),
894 *cls._check_property_name_related_field_accessor_clashes(),
895 *cls._check_single_primary_key(),
896 )
897 errors.extend(clash_errors)
898 # If there are field name clashes, hide consequent column name
899 # clashes.
900 if not clash_errors:
901 errors.extend(cls._check_column_name_clashes())
902 errors += [
903 *cls._check_indexes(),
904 *cls._check_ordering(),
905 *cls._check_constraints(),
906 *cls._check_db_table_comment(),
907 ]
908
909 return errors
910
911 @classmethod
912 def _check_db_table_comment(cls) -> list[PreflightResult]:
913 if not cls.model_options.db_table_comment:
914 return []
915 errors: list[PreflightResult] = []
916 if not (
917 db_connection.features.supports_comments
918 or "supports_comments" in cls.model_options.required_db_features
919 ):
920 errors.append(
921 PreflightResult(
922 fix=f"{db_connection.display_name} does not support comments on "
923 f"tables (db_table_comment).",
924 obj=cls,
925 id="models.db_table_comment_unsupported",
926 warning=True,
927 )
928 )
929 return errors
930
931 @classmethod
932 def _check_fields(cls) -> list[PreflightResult]:
933 """Perform all field checks."""
934 errors: list[PreflightResult] = []
935 for field in cls._model_meta.local_fields:
936 errors.extend(field.preflight(from_model=cls))
937 for field in cls._model_meta.local_many_to_many:
938 errors.extend(field.preflight(from_model=cls))
939 return errors
940
941 @classmethod
942 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
943 """Check if no relationship model is used by more than one m2m field."""
944
945 errors: list[PreflightResult] = []
946 seen_intermediary_signatures = []
947
948 fields = cls._model_meta.local_many_to_many
949
950 # Skip when the target model wasn't found.
951 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
952
953 # Skip when the relationship model wasn't found.
954 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
955
956 for f in fields:
957 signature = (
958 f.remote_field.model,
959 cls,
960 f.remote_field.through,
961 f.remote_field.through_fields,
962 )
963 if signature in seen_intermediary_signatures:
964 errors.append(
965 PreflightResult(
966 fix="The model has two identical many-to-many relations "
967 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
968 obj=cls,
969 id="models.duplicate_many_to_many_relations",
970 )
971 )
972 else:
973 seen_intermediary_signatures.append(signature)
974 return errors
975
976 @classmethod
977 def _check_id_field(cls) -> list[PreflightResult]:
978 """Disallow user-defined fields named ``id``."""
979 if any(
980 f
981 for f in cls._model_meta.local_fields
982 if f.name == "id" and not f.auto_created
983 ):
984 return [
985 PreflightResult(
986 fix="'id' is a reserved word that cannot be used as a field name.",
987 obj=cls,
988 id="models.reserved_field_name_id",
989 )
990 ]
991 return []
992
993 @classmethod
994 def _check_field_name_clashes(cls) -> list[PreflightResult]:
995 """Forbid field shadowing in multi-table inheritance."""
996 errors: list[PreflightResult] = []
997 used_fields = {} # name or attname -> field
998
999 for f in cls._model_meta.local_fields:
1000 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1001 # Note that we may detect clash between user-defined non-unique
1002 # field "id" and automatically added unique field "id", both
1003 # defined at the same model. This special case is considered in
1004 # _check_id_field and here we ignore it.
1005 id_conflict = (
1006 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1007 )
1008 if clash and not id_conflict:
1009 errors.append(
1010 PreflightResult(
1011 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1012 f"from model '{clash.model.model_options}'.",
1013 obj=f,
1014 id="models.field_name_clash",
1015 )
1016 )
1017 used_fields[f.name] = f
1018 used_fields[f.attname] = f
1019
1020 return errors
1021
1022 @classmethod
1023 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1024 # Store a list of column names which have already been used by other fields.
1025 used_column_names: list[str] = []
1026 errors: list[PreflightResult] = []
1027
1028 for f in cls._model_meta.local_fields:
1029 _, column_name = f.get_attname_column()
1030
1031 # Ensure the column name is not already in use.
1032 if column_name and column_name in used_column_names:
1033 errors.append(
1034 PreflightResult(
1035 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1036 "another field. Specify a 'db_column' for the field.",
1037 obj=cls,
1038 id="models.db_column_clash",
1039 )
1040 )
1041 else:
1042 used_column_names.append(column_name)
1043
1044 return errors
1045
1046 @classmethod
1047 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1048 errors: list[PreflightResult] = []
1049 model_name = cls.__name__
1050 if model_name.startswith("_") or model_name.endswith("_"):
1051 errors.append(
1052 PreflightResult(
1053 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1054 "as it collides with the query lookup syntax.",
1055 obj=cls,
1056 id="models.model_name_underscore_bounds",
1057 )
1058 )
1059 elif LOOKUP_SEP in model_name:
1060 errors.append(
1061 PreflightResult(
1062 fix=f"The model name '{model_name}' cannot contain double underscores as "
1063 "it collides with the query lookup syntax.",
1064 obj=cls,
1065 id="models.model_name_double_underscore",
1066 )
1067 )
1068 return errors
1069
1070 @classmethod
1071 def _check_property_name_related_field_accessor_clashes(
1072 cls,
1073 ) -> list[PreflightResult]:
1074 errors: list[PreflightResult] = []
1075 property_names = cls._model_meta._property_names
1076 related_field_accessors = (
1077 f.get_attname()
1078 for f in cls._model_meta._get_fields(reverse=False)
1079 if isinstance(f, RelatedField) and f.related_model is not None
1080 )
1081 for accessor in related_field_accessors:
1082 if accessor in property_names:
1083 errors.append(
1084 PreflightResult(
1085 fix=f"The property '{accessor}' clashes with a related field "
1086 "accessor.",
1087 obj=cls,
1088 id="models.property_related_field_clash",
1089 )
1090 )
1091 return errors
1092
1093 @classmethod
1094 def _check_single_primary_key(cls) -> list[PreflightResult]:
1095 errors: list[PreflightResult] = []
1096 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1097 errors.append(
1098 PreflightResult(
1099 fix="The model cannot have more than one field with "
1100 "'primary_key=True'.",
1101 obj=cls,
1102 id="models.multiple_primary_keys",
1103 )
1104 )
1105 return errors
1106
1107 @classmethod
1108 def _check_indexes(cls) -> list[PreflightResult]:
1109 """Check fields, names, and conditions of indexes."""
1110 errors: list[PreflightResult] = []
1111 references: set[str] = set()
1112 for index in cls.model_options.indexes:
1113 # Index name can't start with an underscore or a number, restricted
1114 # for cross-database compatibility with Oracle.
1115 if index.name[0] == "_" or index.name[0].isdigit():
1116 errors.append(
1117 PreflightResult(
1118 fix=f"The index name '{index.name}' cannot start with an underscore "
1119 "or a number.",
1120 obj=cls,
1121 id="models.index_name_invalid_start",
1122 ),
1123 )
1124 if len(index.name) > index.max_name_length:
1125 errors.append(
1126 PreflightResult(
1127 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1128 "characters." % (index.name, index.max_name_length),
1129 obj=cls,
1130 id="models.index_name_too_long",
1131 ),
1132 )
1133 if index.contains_expressions:
1134 for expression in index.expressions:
1135 references.update(
1136 ref[0] for ref in cls._get_expr_references(expression)
1137 )
1138 if not (
1139 db_connection.features.supports_partial_indexes
1140 or "supports_partial_indexes" in cls.model_options.required_db_features
1141 ) and any(index.condition is not None for index in cls.model_options.indexes):
1142 errors.append(
1143 PreflightResult(
1144 fix=f"{db_connection.display_name} does not support indexes with conditions. "
1145 "Conditions will be ignored. Silence this warning "
1146 "if you don't care about it.",
1147 warning=True,
1148 obj=cls,
1149 id="models.index_conditions_ignored",
1150 )
1151 )
1152 if not (
1153 db_connection.features.supports_covering_indexes
1154 or "supports_covering_indexes" in cls.model_options.required_db_features
1155 ) and any(index.include for index in cls.model_options.indexes):
1156 errors.append(
1157 PreflightResult(
1158 fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1159 "Non-key columns will be ignored. Silence this "
1160 "warning if you don't care about it.",
1161 warning=True,
1162 obj=cls,
1163 id="models.index_non_key_columns_ignored",
1164 )
1165 )
1166 if not (
1167 db_connection.features.supports_expression_indexes
1168 or "supports_expression_indexes" in cls.model_options.required_db_features
1169 ) and any(index.contains_expressions for index in cls.model_options.indexes):
1170 errors.append(
1171 PreflightResult(
1172 fix=f"{db_connection.display_name} does not support indexes on expressions. "
1173 "An index won't be created. Silence this warning "
1174 "if you don't care about it.",
1175 warning=True,
1176 obj=cls,
1177 id="models.index_on_foreign_key",
1178 )
1179 )
1180 fields = [
1181 field
1182 for index in cls.model_options.indexes
1183 for field, _ in index.fields_orders
1184 ]
1185 fields += [
1186 include for index in cls.model_options.indexes for include in index.include
1187 ]
1188 fields += references
1189 errors.extend(cls._check_local_fields(fields, "indexes"))
1190 return errors
1191
1192 @classmethod
1193 def _check_local_fields(
1194 cls, fields: Iterable[str], option: str
1195 ) -> list[PreflightResult]:
1196 # In order to avoid hitting the relation tree prematurely, we use our
1197 # own fields_map instead of using get_field()
1198 forward_fields_map: dict[str, Field] = {}
1199 for field in cls._model_meta._get_fields(reverse=False):
1200 forward_fields_map[field.name] = field
1201 if hasattr(field, "attname"):
1202 forward_fields_map[field.attname] = field
1203
1204 errors: list[PreflightResult] = []
1205 for field_name in fields:
1206 try:
1207 field = forward_fields_map[field_name]
1208 except KeyError:
1209 errors.append(
1210 PreflightResult(
1211 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1212 obj=cls,
1213 id="models.nonexistent_field_reference",
1214 )
1215 )
1216 else:
1217 from plain.models.fields.related import ManyToManyField
1218
1219 if isinstance(field, ManyToManyField):
1220 errors.append(
1221 PreflightResult(
1222 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1223 f"ManyToManyFields are not permitted in '{option}'.",
1224 obj=cls,
1225 id="models.m2m_field_in_meta_option",
1226 )
1227 )
1228 elif field not in cls._model_meta.local_fields:
1229 errors.append(
1230 PreflightResult(
1231 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1232 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1233 obj=cls,
1234 id="models.non_local_field_reference",
1235 )
1236 )
1237 return errors
1238
1239 @classmethod
1240 def _check_ordering(cls) -> list[PreflightResult]:
1241 """
1242 Check "ordering" option -- is it a list of strings and do all fields
1243 exist?
1244 """
1245
1246 if not cls.model_options.ordering:
1247 return []
1248
1249 if not isinstance(cls.model_options.ordering, list | tuple):
1250 return [
1251 PreflightResult(
1252 fix="'ordering' must be a tuple or list (even if you want to order by "
1253 "only one field).",
1254 obj=cls,
1255 id="models.ordering_not_tuple_or_list",
1256 )
1257 ]
1258
1259 errors: list[PreflightResult] = []
1260 fields = cls.model_options.ordering
1261
1262 # Skip expressions and '?' fields.
1263 fields = (f for f in fields if isinstance(f, str) and f != "?")
1264
1265 # Convert "-field" to "field".
1266 fields = (f.removeprefix("-") for f in fields)
1267
1268 # Separate related fields and non-related fields.
1269 _fields = []
1270 related_fields = []
1271 for f in fields:
1272 if LOOKUP_SEP in f:
1273 related_fields.append(f)
1274 else:
1275 _fields.append(f)
1276 fields = _fields
1277
1278 # Check related fields.
1279 for field in related_fields:
1280 _cls = cls
1281 fld = None
1282 for part in field.split(LOOKUP_SEP):
1283 try:
1284 fld = _cls._model_meta.get_field(part)
1285 if isinstance(fld, RelatedField):
1286 _cls = fld.path_infos[-1].to_meta.model
1287 else:
1288 _cls = None
1289 except (FieldDoesNotExist, AttributeError):
1290 if fld is None or (
1291 fld.get_transform(part) is None and fld.get_lookup(part) is None
1292 ):
1293 errors.append(
1294 PreflightResult(
1295 fix="'ordering' refers to the nonexistent field, "
1296 f"related field, or lookup '{field}'.",
1297 obj=cls,
1298 id="models.ordering_nonexistent_field",
1299 )
1300 )
1301
1302 # Check for invalid or nonexistent fields in ordering.
1303 invalid_fields = []
1304
1305 # Any field name that is not present in field_names does not exist.
1306 # Also, ordering by m2m fields is not allowed.
1307 meta = cls._model_meta
1308 valid_fields = set(
1309 chain.from_iterable(
1310 (f.name, f.attname)
1311 if not (f.auto_created and not f.concrete)
1312 else (f.field.related_query_name(),)
1313 for f in chain(meta.fields, meta.related_objects)
1314 )
1315 )
1316
1317 invalid_fields.extend(set(fields) - valid_fields)
1318
1319 for invalid_field in invalid_fields:
1320 errors.append(
1321 PreflightResult(
1322 fix="'ordering' refers to the nonexistent field, related "
1323 f"field, or lookup '{invalid_field}'.",
1324 obj=cls,
1325 id="models.ordering_nonexistent_field",
1326 )
1327 )
1328 return errors
1329
1330 @classmethod
1331 def _check_long_column_names(cls) -> list[PreflightResult]:
1332 """
1333 Check that any auto-generated column names are shorter than the limits
1334 for each database in which the model will be created.
1335 """
1336 errors: list[PreflightResult] = []
1337 allowed_len = None
1338
1339 max_name_length = db_connection.ops.max_name_length()
1340 if max_name_length is not None and not db_connection.features.truncates_names:
1341 allowed_len = max_name_length
1342
1343 if allowed_len is None:
1344 return errors
1345
1346 for f in cls._model_meta.local_fields:
1347 _, column_name = f.get_attname_column()
1348
1349 # Check if auto-generated name for the field is too long
1350 # for the database.
1351 if (
1352 f.db_column is None
1353 and column_name is not None
1354 and len(column_name) > allowed_len
1355 ):
1356 errors.append(
1357 PreflightResult(
1358 fix=f'Autogenerated column name too long for field "{column_name}". '
1359 f'Maximum length is "{allowed_len}" for the database. '
1360 "Set the column name manually using 'db_column'.",
1361 obj=cls,
1362 id="models.autogenerated_column_name_too_long",
1363 )
1364 )
1365
1366 for f in cls._model_meta.local_many_to_many:
1367 # Skip nonexistent models.
1368 if isinstance(f.remote_field.through, str):
1369 continue
1370
1371 # Check if auto-generated name for the M2M field is too long
1372 # for the database.
1373 for m2m in f.remote_field.through._model_meta.local_fields:
1374 _, rel_name = m2m.get_attname_column()
1375 if (
1376 m2m.db_column is None
1377 and rel_name is not None
1378 and len(rel_name) > allowed_len
1379 ):
1380 errors.append(
1381 PreflightResult(
1382 fix="Autogenerated column name too long for M2M field "
1383 f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1384 "Use 'through' to create a separate model for "
1385 "M2M and then set column_name using 'db_column'.",
1386 obj=cls,
1387 id="models.m2m_column_name_too_long",
1388 )
1389 )
1390
1391 return errors
1392
1393 @classmethod
1394 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1395 if isinstance(expr, Q):
1396 for child in expr.children:
1397 if isinstance(child, tuple):
1398 lookup, value = child
1399 yield tuple(lookup.split(LOOKUP_SEP))
1400 yield from cls._get_expr_references(value)
1401 else:
1402 yield from cls._get_expr_references(child)
1403 elif isinstance(expr, F):
1404 yield tuple(expr.name.split(LOOKUP_SEP))
1405 elif hasattr(expr, "get_source_expressions"):
1406 for src_expr in expr.get_source_expressions():
1407 yield from cls._get_expr_references(src_expr)
1408
1409 @classmethod
1410 def _check_constraints(cls) -> list[PreflightResult]:
1411 errors: list[PreflightResult] = []
1412 if not (
1413 db_connection.features.supports_table_check_constraints
1414 or "supports_table_check_constraints"
1415 in cls.model_options.required_db_features
1416 ) and any(
1417 isinstance(constraint, CheckConstraint)
1418 for constraint in cls.model_options.constraints
1419 ):
1420 errors.append(
1421 PreflightResult(
1422 fix=f"{db_connection.display_name} does not support check constraints. "
1423 "A constraint won't be created. Silence this "
1424 "warning if you don't care about it.",
1425 obj=cls,
1426 id="models.constraint_on_non_db_field",
1427 warning=True,
1428 )
1429 )
1430
1431 if not (
1432 db_connection.features.supports_partial_indexes
1433 or "supports_partial_indexes" in cls.model_options.required_db_features
1434 ) and any(
1435 isinstance(constraint, UniqueConstraint)
1436 and constraint.condition is not None
1437 for constraint in cls.model_options.constraints
1438 ):
1439 errors.append(
1440 PreflightResult(
1441 fix=f"{db_connection.display_name} does not support unique constraints with "
1442 "conditions. A constraint won't be created. Silence this "
1443 "warning if you don't care about it.",
1444 obj=cls,
1445 id="models.constraint_on_virtual_field",
1446 warning=True,
1447 )
1448 )
1449
1450 if not (
1451 db_connection.features.supports_deferrable_unique_constraints
1452 or "supports_deferrable_unique_constraints"
1453 in cls.model_options.required_db_features
1454 ) and any(
1455 isinstance(constraint, UniqueConstraint)
1456 and constraint.deferrable is not None
1457 for constraint in cls.model_options.constraints
1458 ):
1459 errors.append(
1460 PreflightResult(
1461 fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1462 "A constraint won't be created. Silence this "
1463 "warning if you don't care about it.",
1464 obj=cls,
1465 id="models.constraint_on_foreign_key",
1466 warning=True,
1467 )
1468 )
1469
1470 if not (
1471 db_connection.features.supports_covering_indexes
1472 or "supports_covering_indexes" in cls.model_options.required_db_features
1473 ) and any(
1474 isinstance(constraint, UniqueConstraint) and constraint.include
1475 for constraint in cls.model_options.constraints
1476 ):
1477 errors.append(
1478 PreflightResult(
1479 fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1480 "columns. A constraint won't be created. Silence this "
1481 "warning if you don't care about it.",
1482 obj=cls,
1483 id="models.constraint_on_m2m_field",
1484 warning=True,
1485 )
1486 )
1487
1488 if not (
1489 db_connection.features.supports_expression_indexes
1490 or "supports_expression_indexes" in cls.model_options.required_db_features
1491 ) and any(
1492 isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1493 for constraint in cls.model_options.constraints
1494 ):
1495 errors.append(
1496 PreflightResult(
1497 fix=f"{db_connection.display_name} does not support unique constraints on "
1498 "expressions. A constraint won't be created. Silence this "
1499 "warning if you don't care about it.",
1500 obj=cls,
1501 id="models.constraint_on_self_referencing_fk",
1502 warning=True,
1503 )
1504 )
1505 fields = set(
1506 chain.from_iterable(
1507 (*constraint.fields, *constraint.include)
1508 for constraint in cls.model_options.constraints
1509 if isinstance(constraint, UniqueConstraint)
1510 )
1511 )
1512 references = set()
1513 for constraint in cls.model_options.constraints:
1514 if isinstance(constraint, UniqueConstraint):
1515 if (
1516 db_connection.features.supports_partial_indexes
1517 or "supports_partial_indexes"
1518 not in cls.model_options.required_db_features
1519 ) and isinstance(constraint.condition, Q):
1520 references.update(cls._get_expr_references(constraint.condition))
1521 if (
1522 db_connection.features.supports_expression_indexes
1523 or "supports_expression_indexes"
1524 not in cls.model_options.required_db_features
1525 ) and constraint.contains_expressions:
1526 for expression in constraint.expressions:
1527 references.update(cls._get_expr_references(expression))
1528 elif isinstance(constraint, CheckConstraint):
1529 if (
1530 db_connection.features.supports_table_check_constraints
1531 or "supports_table_check_constraints"
1532 not in cls.model_options.required_db_features
1533 ):
1534 if isinstance(constraint.check, Q):
1535 references.update(cls._get_expr_references(constraint.check))
1536 if any(
1537 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1538 ):
1539 errors.append(
1540 PreflightResult(
1541 fix=f"Check constraint {constraint.name!r} contains "
1542 f"RawSQL() expression and won't be validated "
1543 f"during the model full_clean(). "
1544 "Silence this warning if you don't care about it.",
1545 warning=True,
1546 obj=cls,
1547 id="models.constraint_name_collision_autogenerated",
1548 ),
1549 )
1550 for field_name, *lookups in references:
1551 fields.add(field_name)
1552 if not lookups:
1553 # If it has no lookups it cannot result in a JOIN.
1554 continue
1555 try:
1556 field = cls._model_meta.get_field(field_name)
1557 from plain.models.fields.related import ManyToManyField
1558 from plain.models.fields.reverse_related import ForeignKeyRel
1559
1560 if (
1561 not isinstance(field, RelatedField)
1562 or isinstance(field, ManyToManyField)
1563 or isinstance(field, ForeignKeyRel)
1564 ):
1565 continue
1566 except FieldDoesNotExist:
1567 continue
1568 # JOIN must happen at the first lookup.
1569 first_lookup = lookups[0]
1570 if (
1571 hasattr(field, "get_transform")
1572 and hasattr(field, "get_lookup")
1573 and field.get_transform(first_lookup) is None
1574 and field.get_lookup(first_lookup) is None
1575 ):
1576 errors.append(
1577 PreflightResult(
1578 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1579 obj=cls,
1580 id="models.constraint_refers_to_joined_field",
1581 )
1582 )
1583 errors.extend(cls._check_local_fields(fields, "constraints"))
1584 return errors
1585
1586
1587########
1588# MISC #
1589########
1590
1591
1592def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1593 """Used to unpickle Model subclasses with deferred fields."""
1594 if isinstance(model_id, tuple):
1595 model = models_registry.get_model(*model_id)
1596 else:
1597 # Backwards compat - the model was cached directly in earlier versions.
1598 model = model_id
1599 return model.__new__(model)
1600
1601
1602# Pickle protocol marker - functions don't normally have this attribute
1603model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]