1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, cast
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction, types
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.exceptions import (
25 DoesNotExistDescriptor,
26 FieldDoesNotExist,
27 MultipleObjectsReturnedDescriptor,
28)
29from plain.models.expressions import RawSQL, Value
30from plain.models.fields import NOT_PROVIDED, Field
31from plain.models.fields.related import RelatedField
32from plain.models.fields.reverse_related import ForeignObjectRel
33from plain.models.meta import Meta
34from plain.models.options import Options
35from plain.models.query import F, Q, QuerySet
36from plain.preflight import PreflightResult
37from plain.utils.encoding import force_str
38from plain.utils.hashable import make_hashable
39
40
41class Deferred:
42 def __repr__(self) -> str:
43 return "<Deferred field>"
44
45 def __str__(self) -> str:
46 return "<Deferred field>"
47
48
49DEFERRED = Deferred()
50
51
52class ModelBase(type):
53 """Metaclass for all models."""
54
55 def __new__(
56 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
57 ) -> type:
58 # Don't do any of this for the root models.Model class.
59 if not bases:
60 return super().__new__(cls, name, bases, attrs)
61
62 for base in bases:
63 # Models are required to directly inherit from model.Model, not a subclass of it.
64 if issubclass(base, Model) and base is not Model:
65 raise TypeError(
66 f"A model can't extend another model: {name} extends {base}"
67 )
68
69 return super().__new__(cls, name, bases, attrs, **kwargs)
70
71
72class ModelStateFieldsCacheDescriptor:
73 def __get__(
74 self, instance: ModelState | None, cls: type | None = None
75 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
76 if instance is None:
77 return self
78 res = instance.fields_cache = {}
79 return res
80
81
82class ModelState:
83 """Store model instance state."""
84
85 # If true, uniqueness validation checks will consider this a new, unsaved
86 # object. Necessary for correct validation of new instances of objects with
87 # explicit (non-auto) PKs. This impacts validation only; it has no effect
88 # on the actual save.
89 adding = True
90 fields_cache = ModelStateFieldsCacheDescriptor()
91
92
93class Model(metaclass=ModelBase):
94 # Every model gets an automatic id field
95 id: int = types.PrimaryKeyField()
96
97 # Descriptors for other model behavior
98 query: QuerySet[Model] = QuerySet()
99 model_options: Options = Options()
100 _model_meta: Meta = Meta()
101 DoesNotExist = DoesNotExistDescriptor()
102 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
103
104 def __init__(self, **kwargs: Any):
105 # Alias some things as locals to avoid repeat global lookups
106 cls = self.__class__
107 meta = cls._model_meta
108 _setattr = setattr
109 _DEFERRED = DEFERRED
110
111 # Set up the storage for instance state
112 self._state = ModelState()
113
114 # Process all fields from kwargs or use defaults
115 for field in meta.fields:
116 from plain.models.fields.related import RelatedField
117
118 is_related_object = False
119 # Virtual field
120 if field.attname not in kwargs and field.column is None:
121 continue
122 if isinstance(field, RelatedField) and isinstance(
123 field.remote_field, ForeignObjectRel
124 ):
125 try:
126 # Assume object instance was passed in.
127 rel_obj = kwargs.pop(field.name)
128 is_related_object = True
129 except KeyError:
130 try:
131 # Object instance wasn't passed in -- must be an ID.
132 val = kwargs.pop(field.attname)
133 except KeyError:
134 val = field.get_default()
135 else:
136 try:
137 val = kwargs.pop(field.attname)
138 except KeyError:
139 # This is done with an exception rather than the
140 # default argument on pop because we don't want
141 # get_default() to be evaluated, and then not used.
142 # Refs #12057.
143 val = field.get_default()
144
145 if is_related_object:
146 # If we are passed a related instance, set it using the
147 # field.name instead of field.attname (e.g. "user" instead of
148 # "user_id") so that the object gets properly cached (and type
149 # checked) by the RelatedObjectDescriptor.
150 if rel_obj is not _DEFERRED:
151 _setattr(self, field.name, rel_obj)
152 else:
153 if val is not _DEFERRED:
154 _setattr(self, field.attname, val)
155
156 # Handle any remaining kwargs (properties or virtual fields)
157 property_names = meta._property_names
158 unexpected = ()
159 for prop, value in kwargs.items():
160 # Any remaining kwargs must correspond to properties or virtual
161 # fields.
162 if prop in property_names:
163 if value is not _DEFERRED:
164 _setattr(self, prop, value)
165 else:
166 try:
167 meta.get_field(prop)
168 except FieldDoesNotExist:
169 unexpected += (prop,)
170 else:
171 if value is not _DEFERRED:
172 _setattr(self, prop, value)
173 if unexpected:
174 unexpected_names = ", ".join(repr(n) for n in unexpected)
175 raise TypeError(
176 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
177 )
178
179 super().__init__()
180
181 @classmethod
182 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
183 if len(values) != len(cls._model_meta.concrete_fields):
184 values_iter = iter(values)
185 values = [
186 next(values_iter) if f.attname in field_names else DEFERRED
187 for f in cls._model_meta.concrete_fields
188 ]
189 # Build kwargs dict from field names and values
190 field_dict = dict(
191 zip((f.attname for f in cls._model_meta.concrete_fields), values)
192 )
193 new = cls(**field_dict)
194 new._state.adding = False
195 return new
196
197 def __repr__(self) -> str:
198 return f"<{self.__class__.__name__}: {self}>"
199
200 def __str__(self) -> str:
201 return f"{self.__class__.__name__} object ({self.id})"
202
203 def __eq__(self, other: object) -> bool:
204 if not isinstance(other, Model):
205 return NotImplemented
206 if self.__class__ != other.__class__:
207 return False
208 my_id = self.id
209 if my_id is None:
210 return self is other
211 return my_id == other.id
212
213 def __hash__(self) -> int:
214 if self.id is None:
215 raise TypeError("Model instances without primary key value are unhashable")
216 return hash(self.id)
217
218 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
219 data = self.__getstate__()
220 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
221 class_id = (
222 self.model_options.package_label,
223 self.model_options.object_name,
224 )
225 return model_unpickle, (class_id,), data
226
227 def __getstate__(self) -> dict[str, Any]:
228 """Hook to allow choosing the attributes to pickle."""
229 state = self.__dict__.copy()
230 state["_state"] = copy.copy(state["_state"])
231 state["_state"].fields_cache = state["_state"].fields_cache.copy()
232 # memoryview cannot be pickled, so cast it to bytes and store
233 # separately.
234 _memoryview_attrs = []
235 for attr, value in state.items():
236 if isinstance(value, memoryview):
237 _memoryview_attrs.append((attr, bytes(value)))
238 if _memoryview_attrs:
239 state["_memoryview_attrs"] = _memoryview_attrs
240 for attr, value in _memoryview_attrs:
241 state.pop(attr)
242 return state
243
244 def __setstate__(self, state: dict[str, Any]) -> None:
245 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
246 if pickled_version:
247 if pickled_version != plain.runtime.__version__:
248 warnings.warn(
249 f"Pickled model instance's Plain version {pickled_version} does not "
250 f"match the current version {plain.runtime.__version__}.",
251 RuntimeWarning,
252 stacklevel=2,
253 )
254 else:
255 warnings.warn(
256 "Pickled model instance's Plain version is not specified.",
257 RuntimeWarning,
258 stacklevel=2,
259 )
260 if "_memoryview_attrs" in state:
261 for attr, value in state.pop("_memoryview_attrs"):
262 state[attr] = memoryview(value)
263 self.__dict__.update(state)
264
265 def get_deferred_fields(self) -> set[str]:
266 """
267 Return a set containing names of deferred fields for this instance.
268 """
269 return {
270 f.attname
271 for f in self._model_meta.concrete_fields
272 if f.attname not in self.__dict__
273 }
274
275 def refresh_from_db(self, fields: list[str] | None = None) -> None:
276 """
277 Reload field values from the database.
278
279 By default, the reloading happens from the database this instance was
280 loaded from, or by the read router if this instance wasn't loaded from
281 any database. The using parameter will override the default.
282
283 Fields can be used to specify which fields to reload. The fields
284 should be an iterable of field attnames. If fields is None, then
285 all non-deferred fields are reloaded.
286
287 When accessing deferred fields of an instance, the deferred loading
288 of the field will call this method.
289 """
290 if fields is None:
291 self._prefetched_objects_cache = {}
292 else:
293 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", {})
294 for field in fields:
295 if field in prefetched_objects_cache:
296 del prefetched_objects_cache[field]
297 fields.remove(field)
298 if not fields:
299 return
300 if any(LOOKUP_SEP in f for f in fields):
301 raise ValueError(
302 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
303 "are not allowed in fields."
304 )
305
306 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
307
308 # Use provided fields, if not set then reload all non-deferred fields.
309 deferred_fields = self.get_deferred_fields()
310 if fields is not None:
311 fields = list(fields)
312 db_instance_qs = db_instance_qs.only(*fields)
313 elif deferred_fields:
314 fields = [
315 f.attname
316 for f in self._model_meta.concrete_fields
317 if f.attname not in deferred_fields
318 ]
319 db_instance_qs = db_instance_qs.only(*fields)
320
321 db_instance = db_instance_qs.get()
322 non_loaded_fields = db_instance.get_deferred_fields()
323 for field in self._model_meta.concrete_fields:
324 if field.attname in non_loaded_fields:
325 # This field wasn't refreshed - skip ahead.
326 continue
327 setattr(self, field.attname, getattr(db_instance, field.attname))
328 # Clear cached foreign keys.
329 if isinstance(field, RelatedField) and field.is_cached(self):
330 field.delete_cached_value(self)
331
332 # Clear cached relations.
333 for field in self._model_meta.related_objects:
334 if field.is_cached(self):
335 field.delete_cached_value(self)
336
337 def serializable_value(self, field_name: str) -> Any:
338 """
339 Return the value of the field name for this instance. If the field is
340 a foreign key, return the id value instead of the object. If there's
341 no Field object with this name on the model, return the model
342 attribute's value.
343
344 Used to serialize a field's value (in the serializer, or form output,
345 for example). Normally, you would just access the attribute directly
346 and not use this method.
347 """
348 try:
349 field = self._model_meta.get_forward_field(field_name)
350 except FieldDoesNotExist:
351 return getattr(self, field_name)
352 return getattr(self, field.attname)
353
354 def save(
355 self,
356 *,
357 clean_and_validate: bool = True,
358 force_insert: bool = False,
359 force_update: bool = False,
360 update_fields: Iterable[str] | None = None,
361 ) -> None:
362 """
363 Save the current instance. Override this in a subclass if you want to
364 control the saving process.
365
366 The 'force_insert' and 'force_update' parameters can be used to insist
367 that the "save" must be an SQL insert or update (or equivalent for
368 non-SQL backends), respectively. Normally, they should not be set.
369 """
370 self._prepare_related_fields_for_save(operation_name="save")
371
372 if force_insert and (force_update or update_fields):
373 raise ValueError("Cannot force both insert and updating in model saving.")
374
375 deferred_fields = self.get_deferred_fields()
376 if update_fields is not None:
377 # If update_fields is empty, skip the save. We do also check for
378 # no-op saves later on for inheritance cases. This bailout is
379 # still needed for skipping signal sending.
380 if not update_fields:
381 return
382
383 update_fields = frozenset(update_fields)
384 field_names = self._model_meta._non_pk_concrete_field_names
385 non_model_fields = update_fields.difference(field_names)
386
387 if non_model_fields:
388 raise ValueError(
389 "The following fields do not exist in this model, are m2m "
390 "fields, or are non-concrete fields: {}".format(
391 ", ".join(non_model_fields)
392 )
393 )
394
395 # If this model is deferred, automatically do an "update_fields" save
396 # on the loaded fields.
397 elif not force_insert and deferred_fields:
398 field_names = set()
399 for field in self._model_meta.concrete_fields:
400 if not field.primary_key and not hasattr(field, "through"):
401 field_names.add(field.attname)
402 loaded_fields = field_names.difference(deferred_fields)
403 if loaded_fields:
404 update_fields = frozenset(loaded_fields)
405
406 if clean_and_validate:
407 self.full_clean(exclude=deferred_fields)
408
409 self.save_base(
410 force_insert=force_insert,
411 force_update=force_update,
412 update_fields=update_fields,
413 )
414
415 def save_base(
416 self,
417 *,
418 raw: bool = False,
419 force_insert: bool = False,
420 force_update: bool = False,
421 update_fields: Iterable[str] | None = None,
422 ) -> None:
423 """
424 Handle the parts of saving which should be done only once per save,
425 yet need to be done in raw saves, too. This includes some sanity
426 checks and signal sending.
427
428 The 'raw' argument is telling save_base not to save any parent
429 models and not to do any changes to the values before save. This
430 is used by fixture loading.
431 """
432 assert not (force_insert and (force_update or update_fields))
433 assert update_fields is None or update_fields
434 cls = self.__class__
435
436 with transaction.mark_for_rollback_on_error():
437 self._save_table(
438 raw=raw,
439 cls=cls,
440 force_insert=force_insert,
441 force_update=force_update,
442 update_fields=update_fields,
443 )
444 # Once saved, this is no longer a to-be-added instance.
445 self._state.adding = False
446
447 def _save_table(
448 self,
449 *,
450 raw: bool,
451 cls: type[Model],
452 force_insert: bool = False,
453 force_update: bool = False,
454 update_fields: Iterable[str] | None = None,
455 ) -> bool:
456 """
457 Do the heavy-lifting involved in saving. Update or insert the data
458 for a single table.
459 """
460 meta = cls._model_meta
461 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
462
463 if update_fields:
464 non_pks = [
465 f
466 for f in non_pks
467 if f.name in update_fields or f.attname in update_fields
468 ]
469
470 id_val = self.id
471 if id_val is None:
472 id_field = meta.get_forward_field("id")
473 id_val = id_field.get_id_value_on_save(self)
474 setattr(self, id_field.attname, id_val)
475 id_set = id_val is not None
476 if not id_set and (force_update or update_fields):
477 raise ValueError("Cannot force an update in save() with no primary key.")
478 updated = False
479 # Skip an UPDATE when adding an instance and primary key has a default.
480 if (
481 not raw
482 and not force_insert
483 and self._state.adding
484 and meta.get_forward_field("id").default
485 and meta.get_forward_field("id").default is not NOT_PROVIDED
486 ):
487 force_insert = True
488 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
489 if id_set and not force_insert:
490 base_qs = meta.base_queryset
491 values = [
492 (
493 f,
494 None,
495 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
496 )
497 for f in non_pks
498 ]
499 forced_update = bool(update_fields or force_update)
500 updated = self._do_update(
501 base_qs, id_val, values, update_fields, forced_update
502 )
503 if force_update and not updated:
504 raise DatabaseError("Forced update did not affect any rows.")
505 if update_fields and not updated:
506 raise DatabaseError("Save with update_fields did not affect any rows.")
507 if not updated:
508 fields = meta.local_concrete_fields
509 if not id_set:
510 id_field = meta.get_forward_field("id")
511 fields = [f for f in fields if f is not id_field]
512
513 returning_fields = meta.db_returning_fields
514 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
515 if results:
516 for value, field in zip(results[0], returning_fields):
517 setattr(self, field.attname, value)
518 return updated
519
520 def _do_update(
521 self,
522 base_qs: QuerySet,
523 id_val: Any,
524 values: list[tuple[Any, Any, Any]],
525 update_fields: Iterable[str] | None,
526 forced_update: bool,
527 ) -> bool:
528 """
529 Try to update the model. Return True if the model was updated (if an
530 update query was done and a matching row was found in the DB).
531 """
532 filtered = base_qs.filter(id=id_val)
533 if not values:
534 # We can end up here when saving a model in inheritance chain where
535 # update_fields doesn't target any field in current model. In that
536 # case we just say the update succeeded. Another case ending up here
537 # is a model with just PK - in that case check that the PK still
538 # exists.
539 return update_fields is not None or filtered.exists()
540 return filtered._update(values) > 0
541
542 def _do_insert(
543 self,
544 manager: QuerySet,
545 fields: Sequence[Any],
546 returning_fields: Sequence[Any],
547 raw: bool,
548 ) -> list[tuple[Any, ...]] | None:
549 """
550 Do an INSERT. If returning_fields is defined then this method should
551 return the newly created data for the model.
552 """
553 return manager._insert(
554 [self],
555 fields=list(fields),
556 returning_fields=list(returning_fields) if returning_fields else None,
557 raw=raw,
558 )
559
560 def _prepare_related_fields_for_save(
561 self, operation_name: str, fields: Sequence[Any] | None = None
562 ) -> None:
563 # Ensure that a model instance without a PK hasn't been assigned to
564 # a ForeignKeyField on this model. If the field is nullable, allowing the save would result in silent data loss.
565 for field in self._model_meta.concrete_fields:
566 if fields and field not in fields:
567 continue
568 # If the related field isn't cached, then an instance hasn't been
569 # assigned and there's no need to worry about this check.
570 if isinstance(field, RelatedField) and field.is_cached(self):
571 obj = getattr(self, field.name, None)
572 if not obj:
573 continue
574 # A pk may have been assigned manually to a model instance not
575 # saved to the database (or auto-generated in a case like
576 # UUIDField), but we allow the save to proceed and rely on the
577 # database to raise an IntegrityError if applicable. If
578 # constraints aren't supported by the database, there's the
579 # unavoidable risk of data corruption.
580 if obj.id is None:
581 # Remove the object from a related instance cache.
582 if not field.remote_field.multiple:
583 field.remote_field.delete_cached_value(obj)
584 raise ValueError(
585 f"{operation_name}() prohibited to prevent data loss due to unsaved "
586 f"related object '{field.name}'."
587 )
588 elif getattr(self, field.attname) in field.empty_values:
589 # Set related object if it has been saved after an
590 # assignment.
591 setattr(self, field.name, obj)
592 # If the relationship's pk/to_field was changed, clear the
593 # cached relationship.
594 if getattr(obj, field.target_field.attname) != getattr(
595 self, field.attname
596 ):
597 field.delete_cached_value(self)
598
599 def delete(self) -> tuple[int, dict[str, int]]:
600 if self.id is None:
601 raise ValueError(
602 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
603 "to None."
604 )
605 collector = Collector(origin=self)
606 collector.collect([self])
607 return collector.delete()
608
609 def get_field_display(self, field_name: str) -> str:
610 """Get the display value for a field, especially useful for fields with choices."""
611 # Get the field object from the field name
612 field = self._model_meta.get_forward_field(field_name)
613 value = getattr(self, field.attname)
614
615 # If field has no choices, just return the value as string
616 if not hasattr(field, "flatchoices") or not field.flatchoices:
617 return force_str(value, strings_only=True)
618
619 # For fields with choices, look up the display value
620 choices_dict = dict(make_hashable(field.flatchoices))
621 return force_str(
622 choices_dict.get(make_hashable(value), value), strings_only=True
623 )
624
625 def _get_field_value_map(
626 self, meta: Meta | None, exclude: set[str] | None = None
627 ) -> dict[str, Value]:
628 if exclude is None:
629 exclude = set()
630 meta = meta or self._model_meta
631 return {
632 field.name: Value(getattr(self, field.attname), field)
633 for field in meta.local_concrete_fields
634 if field.name not in exclude
635 }
636
637 def prepare_database_save(self, field: Any) -> Any:
638 if self.id is None:
639 raise ValueError(
640 f"Unsaved model instance {self!r} cannot be used in an ORM query."
641 )
642 return getattr(self, field.remote_field.get_related_field().attname)
643
644 def clean(self) -> None:
645 """
646 Hook for doing any extra model-wide validation after clean() has been
647 called on every field by self.clean_fields. Any ValidationError raised
648 by this method will not be associated with a particular field; it will
649 have a special-case association with the field defined by NON_FIELD_ERRORS.
650 """
651 pass
652
653 def validate_unique(self, exclude: set[str] | None = None) -> None:
654 """
655 Check unique constraints on the model and raise ValidationError if any
656 failed.
657 """
658 unique_checks = self._get_unique_checks(exclude=exclude)
659
660 if errors := self._perform_unique_checks(unique_checks):
661 raise ValidationError(errors)
662
663 def _get_unique_checks(
664 self, exclude: set[str] | None = None
665 ) -> list[tuple[type[Model], tuple[str, ...]]]:
666 """
667 Return a list of checks to perform. Since validate_unique() could be
668 called from a ModelForm, some fields may have been excluded; we can't
669 perform a unique check on a model that is missing fields involved
670 in that check. Fields that did not validate should also be excluded,
671 but they need to be passed in via the exclude argument.
672 """
673 if exclude is None:
674 exclude = set()
675 unique_checks = []
676
677 # Gather a list of checks for fields declared as unique and add them to
678 # the list of checks.
679
680 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
681
682 for model_class, fields in fields_with_class:
683 for f in fields:
684 name = f.name
685 if name in exclude:
686 continue
687 if f.primary_key:
688 unique_checks.append((model_class, (name,)))
689
690 return unique_checks
691
692 def _perform_unique_checks(
693 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
694 ) -> dict[str, list[ValidationError]]:
695 errors = {}
696
697 for model_class, unique_check in unique_checks:
698 # Try to look up an existing object with the same values as this
699 # object's values for all the unique field.
700
701 lookup_kwargs = {}
702 for field_name in unique_check:
703 f = self._model_meta.get_forward_field(field_name)
704 lookup_value = getattr(self, f.attname)
705 # TODO: Handle multiple backends with different feature flags.
706 if lookup_value is None:
707 # no value, skip the lookup
708 continue
709 if f.primary_key and not self._state.adding:
710 # no need to check for unique primary key when editing
711 continue
712 lookup_kwargs[str(field_name)] = lookup_value
713
714 # some fields were skipped, no reason to do the check
715 if len(unique_check) != len(lookup_kwargs):
716 continue
717
718 qs = model_class.query.filter(**lookup_kwargs)
719
720 # Exclude the current object from the query if we are editing an
721 # instance (as opposed to creating a new one)
722 # Use the primary key defined by model_class. In previous versions
723 # this could differ from `self.id` due to model inheritance.
724 model_class_id = getattr(self, "id")
725 if not self._state.adding and model_class_id is not None:
726 qs = qs.exclude(id=model_class_id)
727 if qs.exists():
728 if len(unique_check) == 1:
729 key = unique_check[0]
730 else:
731 key = NON_FIELD_ERRORS
732 errors.setdefault(key, []).append(
733 self.unique_error_message(model_class, unique_check)
734 )
735
736 return errors
737
738 def unique_error_message(
739 self, model_class: type[Model], unique_check: tuple[str, ...]
740 ) -> ValidationError:
741 meta = model_class._model_meta
742
743 params = {
744 "model": self,
745 "model_class": model_class,
746 "model_name": model_class.model_options.model_name,
747 "unique_check": unique_check,
748 }
749
750 if len(unique_check) == 1:
751 field = meta.get_forward_field(unique_check[0])
752 params["field_label"] = field.name
753 return ValidationError(
754 message=field.error_messages["unique"],
755 code="unique",
756 params=params,
757 )
758 else:
759 field_names = [meta.get_forward_field(f).name for f in unique_check]
760
761 # Put an "and" before the last one
762 field_names[-1] = f"and {field_names[-1]}"
763
764 if len(field_names) > 2:
765 # Comma join if more than 2
766 params["field_label"] = ", ".join(cast(list[str], field_names))
767 else:
768 # Just a space if there are only 2
769 params["field_label"] = " ".join(cast(list[str], field_names))
770
771 # Use the first field as the message format...
772 message = meta.get_forward_field(unique_check[0]).error_messages["unique"]
773
774 return ValidationError(
775 message=message,
776 code="unique",
777 params=params,
778 )
779
780 def get_constraints(self) -> list[tuple[type[Model], list[Any]]]:
781 constraints = [(self.__class__, list(self.model_options.constraints))]
782 return constraints
783
784 def validate_constraints(self, exclude: set[str] | None = None) -> None:
785 constraints = self.get_constraints()
786
787 errors = {}
788 for model_class, model_constraints in constraints:
789 for constraint in model_constraints:
790 try:
791 constraint.validate(model_class, self, exclude=exclude)
792 except ValidationError as e:
793 if (
794 getattr(e, "code", None) == "unique"
795 and len(constraint.fields) == 1
796 ):
797 errors.setdefault(constraint.fields[0], []).append(e)
798 else:
799 errors = e.update_error_dict(errors)
800 if errors:
801 raise ValidationError(errors)
802
803 def full_clean(
804 self,
805 *,
806 exclude: set[str] | Iterable[str] | None = None,
807 validate_unique: bool = True,
808 validate_constraints: bool = True,
809 ) -> None:
810 """
811 Call clean_fields(), clean(), validate_unique(), and
812 validate_constraints() on the model. Raise a ValidationError for any
813 errors that occur.
814 """
815 errors = {}
816 if exclude is None:
817 exclude = set()
818 else:
819 exclude = set(exclude)
820
821 try:
822 self.clean_fields(exclude=exclude)
823 except ValidationError as e:
824 errors = e.update_error_dict(errors)
825
826 # Form.clean() is run even if other validation fails, so do the
827 # same with Model.clean() for consistency.
828 try:
829 self.clean()
830 except ValidationError as e:
831 errors = e.update_error_dict(errors)
832
833 # Run unique checks, but only for fields that passed validation.
834 if validate_unique:
835 for name in errors:
836 if name != NON_FIELD_ERRORS and name not in exclude:
837 exclude.add(name)
838 try:
839 self.validate_unique(exclude=exclude)
840 except ValidationError as e:
841 errors = e.update_error_dict(errors)
842
843 # Run constraints checks, but only for fields that passed validation.
844 if validate_constraints:
845 for name in errors:
846 if name != NON_FIELD_ERRORS and name not in exclude:
847 exclude.add(name)
848 try:
849 self.validate_constraints(exclude=exclude)
850 except ValidationError as e:
851 errors = e.update_error_dict(errors)
852
853 if errors:
854 raise ValidationError(errors)
855
856 def clean_fields(self, exclude: set[str] | None = None) -> None:
857 """
858 Clean all fields and raise a ValidationError containing a dict
859 of all validation errors if any occur.
860 """
861 if exclude is None:
862 exclude = set()
863
864 errors = {}
865 for f in self._model_meta.fields:
866 if f.name in exclude:
867 continue
868 # Skip validation for empty fields with required=False. The developer
869 # is responsible for making sure they have a valid value.
870 raw_value = getattr(self, f.attname)
871 if not f.required and raw_value in f.empty_values:
872 continue
873 try:
874 setattr(self, f.attname, f.clean(raw_value, self))
875 except ValidationError as e:
876 errors[f.name] = e.error_list
877
878 if errors:
879 raise ValidationError(errors)
880
881 @classmethod
882 def preflight(cls) -> list[PreflightResult]:
883 errors: list[PreflightResult] = []
884
885 errors += [
886 *cls._check_fields(),
887 *cls._check_m2m_through_same_relationship(),
888 *cls._check_long_column_names(),
889 ]
890 clash_errors = (
891 *cls._check_id_field(),
892 *cls._check_field_name_clashes(),
893 *cls._check_model_name_db_lookup_clashes(),
894 *cls._check_property_name_related_field_accessor_clashes(),
895 *cls._check_single_primary_key(),
896 )
897 errors.extend(clash_errors)
898 # If there are field name clashes, hide consequent column name
899 # clashes.
900 if not clash_errors:
901 errors.extend(cls._check_column_name_clashes())
902 errors += [
903 *cls._check_indexes(),
904 *cls._check_ordering(),
905 *cls._check_constraints(),
906 *cls._check_db_table_comment(),
907 ]
908
909 return errors
910
911 @classmethod
912 def _check_db_table_comment(cls) -> list[PreflightResult]:
913 if not cls.model_options.db_table_comment:
914 return []
915 errors: list[PreflightResult] = []
916 if not (
917 db_connection.features.supports_comments
918 or "supports_comments" in cls.model_options.required_db_features
919 ):
920 errors.append(
921 PreflightResult(
922 fix=f"{db_connection.display_name} does not support comments on "
923 f"tables (db_table_comment).",
924 obj=cls,
925 id="models.db_table_comment_unsupported",
926 warning=True,
927 )
928 )
929 return errors
930
931 @classmethod
932 def _check_fields(cls) -> list[PreflightResult]:
933 """Perform all field checks."""
934 errors: list[PreflightResult] = []
935 for field in cls._model_meta.local_fields:
936 errors.extend(field.preflight(from_model=cls))
937 for field in cls._model_meta.local_many_to_many:
938 errors.extend(field.preflight(from_model=cls))
939 return errors
940
941 @classmethod
942 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
943 """Check if no relationship model is used by more than one m2m field."""
944
945 errors: list[PreflightResult] = []
946 seen_intermediary_signatures = []
947
948 fields = cls._model_meta.local_many_to_many
949
950 # Skip when the target model wasn't found.
951 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
952
953 # Skip when the relationship model wasn't found.
954 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
955
956 for f in fields:
957 signature = (
958 f.remote_field.model,
959 cls,
960 f.remote_field.through,
961 f.remote_field.through_fields,
962 )
963 if signature in seen_intermediary_signatures:
964 errors.append(
965 PreflightResult(
966 fix="The model has two identical many-to-many relations "
967 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
968 obj=cls,
969 id="models.duplicate_many_to_many_relations",
970 )
971 )
972 else:
973 seen_intermediary_signatures.append(signature)
974 return errors
975
976 @classmethod
977 def _check_id_field(cls) -> list[PreflightResult]:
978 """Disallow user-defined fields named ``id``."""
979 if any(
980 f
981 for f in cls._model_meta.local_fields
982 if f.name == "id" and not f.auto_created
983 ):
984 return [
985 PreflightResult(
986 fix="'id' is a reserved word that cannot be used as a field name.",
987 obj=cls,
988 id="models.reserved_field_name_id",
989 )
990 ]
991 return []
992
993 @classmethod
994 def _check_field_name_clashes(cls) -> list[PreflightResult]:
995 """Forbid field shadowing in multi-table inheritance."""
996 errors: list[PreflightResult] = []
997 used_fields = {} # name or attname -> field
998
999 for f in cls._model_meta.local_fields:
1000 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1001 # Note that we may detect clash between user-defined non-unique
1002 # field "id" and automatically added unique field "id", both
1003 # defined at the same model. This special case is considered in
1004 # _check_id_field and here we ignore it.
1005 id_conflict = (
1006 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1007 )
1008 if clash and not id_conflict:
1009 errors.append(
1010 PreflightResult(
1011 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1012 f"from model '{clash.model.model_options}'.",
1013 obj=f,
1014 id="models.field_name_clash",
1015 )
1016 )
1017 used_fields[f.name] = f
1018 used_fields[f.attname] = f
1019
1020 return errors
1021
1022 @classmethod
1023 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1024 # Store a list of column names which have already been used by other fields.
1025 used_column_names: list[str] = []
1026 errors: list[PreflightResult] = []
1027
1028 for f in cls._model_meta.local_fields:
1029 _, column_name = f.get_attname_column()
1030
1031 # Ensure the column name is not already in use.
1032 if column_name and column_name in used_column_names:
1033 errors.append(
1034 PreflightResult(
1035 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1036 "another field. Specify a 'db_column' for the field.",
1037 obj=cls,
1038 id="models.db_column_clash",
1039 )
1040 )
1041 else:
1042 used_column_names.append(column_name)
1043
1044 return errors
1045
1046 @classmethod
1047 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1048 errors: list[PreflightResult] = []
1049 model_name = cls.__name__
1050 if model_name.startswith("_") or model_name.endswith("_"):
1051 errors.append(
1052 PreflightResult(
1053 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1054 "as it collides with the query lookup syntax.",
1055 obj=cls,
1056 id="models.model_name_underscore_bounds",
1057 )
1058 )
1059 elif LOOKUP_SEP in model_name:
1060 errors.append(
1061 PreflightResult(
1062 fix=f"The model name '{model_name}' cannot contain double underscores as "
1063 "it collides with the query lookup syntax.",
1064 obj=cls,
1065 id="models.model_name_double_underscore",
1066 )
1067 )
1068 return errors
1069
1070 @classmethod
1071 def _check_property_name_related_field_accessor_clashes(
1072 cls,
1073 ) -> list[PreflightResult]:
1074 errors: list[PreflightResult] = []
1075 property_names = cls._model_meta._property_names
1076 related_field_accessors = (
1077 f.get_attname()
1078 for f in cls._model_meta._get_fields(reverse=False)
1079 if isinstance(f, RelatedField)
1080 )
1081 for accessor in related_field_accessors:
1082 if accessor in property_names:
1083 errors.append(
1084 PreflightResult(
1085 fix=f"The property '{accessor}' clashes with a related field "
1086 "accessor.",
1087 obj=cls,
1088 id="models.property_related_field_clash",
1089 )
1090 )
1091 return errors
1092
1093 @classmethod
1094 def _check_single_primary_key(cls) -> list[PreflightResult]:
1095 errors: list[PreflightResult] = []
1096 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1097 errors.append(
1098 PreflightResult(
1099 fix="The model cannot have more than one field with "
1100 "'primary_key=True'.",
1101 obj=cls,
1102 id="models.multiple_primary_keys",
1103 )
1104 )
1105 return errors
1106
1107 @classmethod
1108 def _check_indexes(cls) -> list[PreflightResult]:
1109 """Check fields, names, and conditions of indexes."""
1110 errors: list[PreflightResult] = []
1111 references: set[str] = set()
1112 for index in cls.model_options.indexes:
1113 # Index name can't start with an underscore or a number, restricted
1114 # for cross-database compatibility with Oracle.
1115 if index.name[0] == "_" or index.name[0].isdigit():
1116 errors.append(
1117 PreflightResult(
1118 fix=f"The index name '{index.name}' cannot start with an underscore "
1119 "or a number.",
1120 obj=cls,
1121 id="models.index_name_invalid_start",
1122 ),
1123 )
1124 if len(index.name) > index.max_name_length:
1125 errors.append(
1126 PreflightResult(
1127 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1128 "characters." % (index.name, index.max_name_length),
1129 obj=cls,
1130 id="models.index_name_too_long",
1131 ),
1132 )
1133 if index.contains_expressions:
1134 for expression in index.expressions:
1135 references.update(
1136 ref[0] for ref in cls._get_expr_references(expression)
1137 )
1138 if not (
1139 db_connection.features.supports_partial_indexes
1140 or "supports_partial_indexes" in cls.model_options.required_db_features
1141 ) and any(index.condition is not None for index in cls.model_options.indexes):
1142 errors.append(
1143 PreflightResult(
1144 fix=f"{db_connection.display_name} does not support indexes with conditions. "
1145 "Conditions will be ignored. Silence this warning "
1146 "if you don't care about it.",
1147 warning=True,
1148 obj=cls,
1149 id="models.index_conditions_ignored",
1150 )
1151 )
1152 if not (
1153 db_connection.features.supports_covering_indexes
1154 or "supports_covering_indexes" in cls.model_options.required_db_features
1155 ) and any(index.include for index in cls.model_options.indexes):
1156 errors.append(
1157 PreflightResult(
1158 fix=f"{db_connection.display_name} does not support indexes with non-key columns. "
1159 "Non-key columns will be ignored. Silence this "
1160 "warning if you don't care about it.",
1161 warning=True,
1162 obj=cls,
1163 id="models.index_non_key_columns_ignored",
1164 )
1165 )
1166 if not (
1167 db_connection.features.supports_expression_indexes
1168 or "supports_expression_indexes" in cls.model_options.required_db_features
1169 ) and any(index.contains_expressions for index in cls.model_options.indexes):
1170 errors.append(
1171 PreflightResult(
1172 fix=f"{db_connection.display_name} does not support indexes on expressions. "
1173 "An index won't be created. Silence this warning "
1174 "if you don't care about it.",
1175 warning=True,
1176 obj=cls,
1177 id="models.index_on_foreign_key",
1178 )
1179 )
1180 fields = [
1181 field
1182 for index in cls.model_options.indexes
1183 for field, _ in index.fields_orders
1184 ]
1185 fields += [
1186 include for index in cls.model_options.indexes for include in index.include
1187 ]
1188 fields += references
1189 errors.extend(cls._check_local_fields(fields, "indexes"))
1190 return errors
1191
1192 @classmethod
1193 def _check_local_fields(
1194 cls, fields: Iterable[str], option: str
1195 ) -> list[PreflightResult]:
1196 # In order to avoid hitting the relation tree prematurely, we use our
1197 # own fields_map instead of using get_field()
1198 forward_fields_map: dict[str, Field] = {}
1199 for field in cls._model_meta._get_fields(reverse=False):
1200 forward_fields_map[field.name] = field
1201 if hasattr(field, "attname"):
1202 forward_fields_map[field.attname] = field
1203
1204 errors: list[PreflightResult] = []
1205 for field_name in fields:
1206 try:
1207 field = forward_fields_map[field_name]
1208 except KeyError:
1209 errors.append(
1210 PreflightResult(
1211 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1212 obj=cls,
1213 id="models.nonexistent_field_reference",
1214 )
1215 )
1216 else:
1217 from plain.models.fields.related import ManyToManyField
1218
1219 if isinstance(field, ManyToManyField):
1220 errors.append(
1221 PreflightResult(
1222 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1223 f"ManyToManyFields are not permitted in '{option}'.",
1224 obj=cls,
1225 id="models.m2m_field_in_meta_option",
1226 )
1227 )
1228 elif field not in cls._model_meta.local_fields:
1229 errors.append(
1230 PreflightResult(
1231 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1232 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1233 obj=cls,
1234 id="models.non_local_field_reference",
1235 )
1236 )
1237 return errors
1238
1239 @classmethod
1240 def _check_ordering(cls) -> list[PreflightResult]:
1241 """
1242 Check "ordering" option -- is it a list of strings and do all fields
1243 exist?
1244 """
1245
1246 if not cls.model_options.ordering:
1247 return []
1248
1249 if not isinstance(cls.model_options.ordering, list | tuple):
1250 return [
1251 PreflightResult(
1252 fix="'ordering' must be a tuple or list (even if you want to order by "
1253 "only one field).",
1254 obj=cls,
1255 id="models.ordering_not_tuple_or_list",
1256 )
1257 ]
1258
1259 errors: list[PreflightResult] = []
1260 fields = cls.model_options.ordering
1261
1262 # Skip expressions and '?' fields.
1263 fields = (f for f in fields if isinstance(f, str) and f != "?")
1264
1265 # Convert "-field" to "field".
1266 fields = (f.removeprefix("-") for f in fields)
1267
1268 # Separate related fields and non-related fields.
1269 _fields = []
1270 related_fields = []
1271 for f in fields:
1272 if LOOKUP_SEP in f:
1273 related_fields.append(f)
1274 else:
1275 _fields.append(f)
1276 fields = _fields
1277
1278 # Check related fields.
1279 for field in related_fields:
1280 _cls = cls
1281 fld = None
1282 for part in field.split(LOOKUP_SEP):
1283 try:
1284 fld = _cls._model_meta.get_field(part)
1285 if isinstance(fld, RelatedField):
1286 _cls = fld.path_infos[-1].to_meta.model
1287 else:
1288 _cls = None
1289 except (FieldDoesNotExist, AttributeError):
1290 if fld is None or (
1291 not isinstance(fld, Field)
1292 or (
1293 fld.get_transform(part) is None
1294 and fld.get_lookup(part) is None
1295 )
1296 ):
1297 errors.append(
1298 PreflightResult(
1299 fix="'ordering' refers to the nonexistent field, "
1300 f"related field, or lookup '{field}'.",
1301 obj=cls,
1302 id="models.ordering_nonexistent_field",
1303 )
1304 )
1305
1306 # Check for invalid or nonexistent fields in ordering.
1307 invalid_fields = []
1308
1309 # Any field name that is not present in field_names does not exist.
1310 # Also, ordering by m2m fields is not allowed.
1311 meta = cls._model_meta
1312 valid_fields = set(
1313 chain.from_iterable(
1314 (f.name, f.attname)
1315 if not (f.auto_created and not f.concrete)
1316 else (f.field.related_query_name(),)
1317 for f in chain(meta.fields, meta.related_objects)
1318 )
1319 )
1320
1321 invalid_fields.extend(set(fields) - valid_fields)
1322
1323 for invalid_field in invalid_fields:
1324 errors.append(
1325 PreflightResult(
1326 fix="'ordering' refers to the nonexistent field, related "
1327 f"field, or lookup '{invalid_field}'.",
1328 obj=cls,
1329 id="models.ordering_nonexistent_field",
1330 )
1331 )
1332 return errors
1333
1334 @classmethod
1335 def _check_long_column_names(cls) -> list[PreflightResult]:
1336 """
1337 Check that any auto-generated column names are shorter than the limits
1338 for each database in which the model will be created.
1339 """
1340 errors: list[PreflightResult] = []
1341 allowed_len = None
1342
1343 max_name_length = db_connection.ops.max_name_length()
1344 if max_name_length is not None and not db_connection.features.truncates_names:
1345 allowed_len = max_name_length
1346
1347 if allowed_len is None:
1348 return errors
1349
1350 for f in cls._model_meta.local_fields:
1351 _, column_name = f.get_attname_column()
1352
1353 # Check if auto-generated name for the field is too long
1354 # for the database.
1355 if (
1356 f.db_column is None
1357 and column_name is not None
1358 and len(column_name) > allowed_len
1359 ):
1360 errors.append(
1361 PreflightResult(
1362 fix=f'Autogenerated column name too long for field "{column_name}". '
1363 f'Maximum length is "{allowed_len}" for the database. '
1364 "Set the column name manually using 'db_column'.",
1365 obj=cls,
1366 id="models.autogenerated_column_name_too_long",
1367 )
1368 )
1369
1370 for f in cls._model_meta.local_many_to_many:
1371 # Skip nonexistent models.
1372 if isinstance(f.remote_field.through, str):
1373 continue
1374
1375 # Check if auto-generated name for the M2M field is too long
1376 # for the database.
1377 for m2m in f.remote_field.through._model_meta.local_fields:
1378 _, rel_name = m2m.get_attname_column()
1379 if (
1380 m2m.db_column is None
1381 and rel_name is not None
1382 and len(rel_name) > allowed_len
1383 ):
1384 errors.append(
1385 PreflightResult(
1386 fix="Autogenerated column name too long for M2M field "
1387 f'"{rel_name}". Maximum length is "{allowed_len}" for the database. '
1388 "Use 'through' to create a separate model for "
1389 "M2M and then set column_name using 'db_column'.",
1390 obj=cls,
1391 id="models.m2m_column_name_too_long",
1392 )
1393 )
1394
1395 return errors
1396
1397 @classmethod
1398 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1399 if isinstance(expr, Q):
1400 for child in expr.children:
1401 if isinstance(child, tuple):
1402 lookup, value = child
1403 yield tuple(lookup.split(LOOKUP_SEP))
1404 yield from cls._get_expr_references(value)
1405 else:
1406 yield from cls._get_expr_references(child)
1407 elif isinstance(expr, F):
1408 yield tuple(expr.name.split(LOOKUP_SEP))
1409 elif hasattr(expr, "get_source_expressions"):
1410 for src_expr in expr.get_source_expressions():
1411 yield from cls._get_expr_references(src_expr)
1412
1413 @classmethod
1414 def _check_constraints(cls) -> list[PreflightResult]:
1415 errors: list[PreflightResult] = []
1416 if not (
1417 db_connection.features.supports_table_check_constraints
1418 or "supports_table_check_constraints"
1419 in cls.model_options.required_db_features
1420 ) and any(
1421 isinstance(constraint, CheckConstraint)
1422 for constraint in cls.model_options.constraints
1423 ):
1424 errors.append(
1425 PreflightResult(
1426 fix=f"{db_connection.display_name} does not support check constraints. "
1427 "A constraint won't be created. Silence this "
1428 "warning if you don't care about it.",
1429 obj=cls,
1430 id="models.constraint_on_non_db_field",
1431 warning=True,
1432 )
1433 )
1434
1435 if not (
1436 db_connection.features.supports_partial_indexes
1437 or "supports_partial_indexes" in cls.model_options.required_db_features
1438 ) and any(
1439 isinstance(constraint, UniqueConstraint)
1440 and constraint.condition is not None
1441 for constraint in cls.model_options.constraints
1442 ):
1443 errors.append(
1444 PreflightResult(
1445 fix=f"{db_connection.display_name} does not support unique constraints with "
1446 "conditions. A constraint won't be created. Silence this "
1447 "warning if you don't care about it.",
1448 obj=cls,
1449 id="models.constraint_on_virtual_field",
1450 warning=True,
1451 )
1452 )
1453
1454 if not (
1455 db_connection.features.supports_deferrable_unique_constraints
1456 or "supports_deferrable_unique_constraints"
1457 in cls.model_options.required_db_features
1458 ) and any(
1459 isinstance(constraint, UniqueConstraint)
1460 and constraint.deferrable is not None
1461 for constraint in cls.model_options.constraints
1462 ):
1463 errors.append(
1464 PreflightResult(
1465 fix=f"{db_connection.display_name} does not support deferrable unique constraints. "
1466 "A constraint won't be created. Silence this "
1467 "warning if you don't care about it.",
1468 obj=cls,
1469 id="models.constraint_on_foreign_key",
1470 warning=True,
1471 )
1472 )
1473
1474 if not (
1475 db_connection.features.supports_covering_indexes
1476 or "supports_covering_indexes" in cls.model_options.required_db_features
1477 ) and any(
1478 isinstance(constraint, UniqueConstraint) and constraint.include
1479 for constraint in cls.model_options.constraints
1480 ):
1481 errors.append(
1482 PreflightResult(
1483 fix=f"{db_connection.display_name} does not support unique constraints with non-key "
1484 "columns. A constraint won't be created. Silence this "
1485 "warning if you don't care about it.",
1486 obj=cls,
1487 id="models.constraint_on_m2m_field",
1488 warning=True,
1489 )
1490 )
1491
1492 if not (
1493 db_connection.features.supports_expression_indexes
1494 or "supports_expression_indexes" in cls.model_options.required_db_features
1495 ) and any(
1496 isinstance(constraint, UniqueConstraint) and constraint.contains_expressions
1497 for constraint in cls.model_options.constraints
1498 ):
1499 errors.append(
1500 PreflightResult(
1501 fix=f"{db_connection.display_name} does not support unique constraints on "
1502 "expressions. A constraint won't be created. Silence this "
1503 "warning if you don't care about it.",
1504 obj=cls,
1505 id="models.constraint_on_self_referencing_fk",
1506 warning=True,
1507 )
1508 )
1509 fields = set(
1510 chain.from_iterable(
1511 (*constraint.fields, *constraint.include)
1512 for constraint in cls.model_options.constraints
1513 if isinstance(constraint, UniqueConstraint)
1514 )
1515 )
1516 references = set()
1517 for constraint in cls.model_options.constraints:
1518 if isinstance(constraint, UniqueConstraint):
1519 if (
1520 db_connection.features.supports_partial_indexes
1521 or "supports_partial_indexes"
1522 not in cls.model_options.required_db_features
1523 ) and isinstance(constraint.condition, Q):
1524 references.update(cls._get_expr_references(constraint.condition))
1525 if (
1526 db_connection.features.supports_expression_indexes
1527 or "supports_expression_indexes"
1528 not in cls.model_options.required_db_features
1529 ) and constraint.contains_expressions:
1530 for expression in constraint.expressions:
1531 references.update(cls._get_expr_references(expression))
1532 elif isinstance(constraint, CheckConstraint):
1533 if (
1534 db_connection.features.supports_table_check_constraints
1535 or "supports_table_check_constraints"
1536 not in cls.model_options.required_db_features
1537 ):
1538 if isinstance(constraint.check, Q):
1539 references.update(cls._get_expr_references(constraint.check))
1540 if any(
1541 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1542 ):
1543 errors.append(
1544 PreflightResult(
1545 fix=f"Check constraint {constraint.name!r} contains "
1546 f"RawSQL() expression and won't be validated "
1547 f"during the model full_clean(). "
1548 "Silence this warning if you don't care about it.",
1549 warning=True,
1550 obj=cls,
1551 id="models.constraint_name_collision_autogenerated",
1552 ),
1553 )
1554 for field_name, *lookups in references:
1555 fields.add(field_name)
1556 if not lookups:
1557 # If it has no lookups it cannot result in a JOIN.
1558 continue
1559 try:
1560 field = cls._model_meta.get_field(field_name)
1561 from plain.models.fields.related import ManyToManyField
1562 from plain.models.fields.reverse_related import ForeignKeyRel
1563
1564 if (
1565 not isinstance(field, RelatedField)
1566 or isinstance(field, ManyToManyField)
1567 or isinstance(field, ForeignKeyRel)
1568 ):
1569 continue
1570 except FieldDoesNotExist:
1571 continue
1572 # JOIN must happen at the first lookup.
1573 first_lookup = lookups[0]
1574 if (
1575 hasattr(field, "get_transform")
1576 and hasattr(field, "get_lookup")
1577 and field.get_transform(first_lookup) is None
1578 and field.get_lookup(first_lookup) is None
1579 ):
1580 errors.append(
1581 PreflightResult(
1582 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1583 obj=cls,
1584 id="models.constraint_refers_to_joined_field",
1585 )
1586 )
1587 errors.extend(cls._check_local_fields(fields, "constraints"))
1588 return errors
1589
1590
1591########
1592# MISC #
1593########
1594
1595
1596def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1597 """Used to unpickle Model subclasses with deferred fields."""
1598 if isinstance(model_id, tuple):
1599 model = models_registry.get_model(*model_id)
1600 else:
1601 # Backwards compat - the model was cached directly in earlier versions.
1602 model = model_id
1603 return model.__new__(model)
1604
1605
1606# Pickle protocol marker - functions don't normally have this attribute
1607model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]