1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, cast
8
9if TYPE_CHECKING:
10 from plain.models.meta import Meta
11 from plain.models.options import Options
12
13import plain.runtime
14from plain.exceptions import NON_FIELD_ERRORS, ValidationError
15from plain.models import models_registry, transaction, types
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21)
22from plain.models.deletion import Collector
23from plain.models.exceptions import (
24 DoesNotExistDescriptor,
25 FieldDoesNotExist,
26 MultipleObjectsReturnedDescriptor,
27)
28from plain.models.expressions import RawSQL, Value
29from plain.models.fields import NOT_PROVIDED, Field
30from plain.models.fields.related import RelatedField
31from plain.models.fields.reverse_related import ForeignObjectRel
32from plain.models.meta import Meta
33from plain.models.options import Options
34from plain.models.postgres.sql import MAX_NAME_LENGTH
35from plain.models.query import F, Q, QuerySet
36from plain.preflight import PreflightResult
37from plain.utils.encoding import force_str
38from plain.utils.hashable import make_hashable
39
40
41class Deferred:
42 def __repr__(self) -> str:
43 return "<Deferred field>"
44
45 def __str__(self) -> str:
46 return "<Deferred field>"
47
48
49DEFERRED = Deferred()
50
51
52class ModelBase(type):
53 """Metaclass for all models."""
54
55 def __new__(
56 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
57 ) -> type:
58 # Don't do any of this for the root models.Model class.
59 if not bases:
60 return super().__new__(cls, name, bases, attrs)
61
62 for base in bases:
63 # Models are required to directly inherit from model.Model, not a subclass of it.
64 if issubclass(base, Model) and base is not Model:
65 raise TypeError(
66 f"A model can't extend another model: {name} extends {base}"
67 )
68
69 return super().__new__(cls, name, bases, attrs, **kwargs)
70
71
72class ModelStateFieldsCacheDescriptor:
73 def __get__(
74 self, instance: ModelState | None, cls: type | None = None
75 ) -> ModelStateFieldsCacheDescriptor | dict[str, Any]:
76 if instance is None:
77 return self
78 res = instance.fields_cache = {}
79 return res
80
81
82class ModelState:
83 """Store model instance state."""
84
85 # If true, uniqueness validation checks will consider this a new, unsaved
86 # object. Necessary for correct validation of new instances of objects with
87 # explicit (non-auto) PKs. This impacts validation only; it has no effect
88 # on the actual save.
89 adding = True
90 fields_cache = ModelStateFieldsCacheDescriptor()
91
92
93class Model(metaclass=ModelBase):
94 # Every model gets an automatic id field
95 id: int = types.PrimaryKeyField()
96
97 # Descriptors for other model behavior
98 query: QuerySet[Model] = QuerySet()
99 model_options: Options = Options()
100 _model_meta: Meta = Meta()
101 DoesNotExist = DoesNotExistDescriptor()
102 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
103
104 def __init__(self, **kwargs: Any):
105 # Alias some things as locals to avoid repeat global lookups
106 cls = self.__class__
107 meta = cls._model_meta
108 _setattr = setattr
109 _DEFERRED = DEFERRED
110
111 # Set up the storage for instance state
112 self._state = ModelState()
113
114 # Process all fields from kwargs or use defaults
115 for field in meta.fields:
116 from plain.models.fields.related import RelatedField
117
118 is_related_object = False
119 # Virtual field
120 if field.attname not in kwargs and field.column is None:
121 continue
122 if isinstance(field, RelatedField) and isinstance(
123 field.remote_field, ForeignObjectRel
124 ):
125 try:
126 # Assume object instance was passed in.
127 rel_obj = kwargs.pop(field.name)
128 is_related_object = True
129 except KeyError:
130 try:
131 # Object instance wasn't passed in -- must be an ID.
132 val = kwargs.pop(field.attname)
133 except KeyError:
134 val = field.get_default()
135 else:
136 try:
137 val = kwargs.pop(field.attname)
138 except KeyError:
139 # This is done with an exception rather than the
140 # default argument on pop because we don't want
141 # get_default() to be evaluated, and then not used.
142 # Refs #12057.
143 val = field.get_default()
144
145 if is_related_object:
146 # If we are passed a related instance, set it using the
147 # field.name instead of field.attname (e.g. "user" instead of
148 # "user_id") so that the object gets properly cached (and type
149 # checked) by the RelatedObjectDescriptor.
150 if rel_obj is not _DEFERRED:
151 _setattr(self, field.name, rel_obj)
152 else:
153 if val is not _DEFERRED:
154 _setattr(self, field.attname, val)
155
156 # Handle any remaining kwargs (properties or virtual fields)
157 property_names = meta._property_names
158 unexpected = ()
159 for prop, value in kwargs.items():
160 # Any remaining kwargs must correspond to properties or virtual
161 # fields.
162 if prop in property_names:
163 if value is not _DEFERRED:
164 _setattr(self, prop, value)
165 else:
166 try:
167 meta.get_field(prop)
168 except FieldDoesNotExist:
169 unexpected += (prop,)
170 else:
171 if value is not _DEFERRED:
172 _setattr(self, prop, value)
173 if unexpected:
174 unexpected_names = ", ".join(repr(n) for n in unexpected)
175 raise TypeError(
176 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
177 )
178
179 super().__init__()
180
181 @classmethod
182 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
183 if len(values) != len(cls._model_meta.concrete_fields):
184 values_iter = iter(values)
185 values = [
186 next(values_iter) if f.attname in field_names else DEFERRED
187 for f in cls._model_meta.concrete_fields
188 ]
189 # Build kwargs dict from field names and values
190 field_dict = dict(
191 zip((f.attname for f in cls._model_meta.concrete_fields), values)
192 )
193 new = cls(**field_dict)
194 new._state.adding = False
195 return new
196
197 def __repr__(self) -> str:
198 return f"<{self.__class__.__name__}: {self.id}>"
199
200 def __str__(self) -> str:
201 return f"{self.__class__.__name__} object ({self.id})"
202
203 def __eq__(self, other: object) -> bool:
204 if not isinstance(other, Model):
205 return NotImplemented
206 if self.__class__ != other.__class__:
207 return False
208 my_id = self.id
209 if my_id is None:
210 return self is other
211 return my_id == other.id
212
213 def __hash__(self) -> int:
214 if self.id is None:
215 raise TypeError("Model instances without primary key value are unhashable")
216 return hash(self.id)
217
218 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
219 data = self.__getstate__()
220 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
221 class_id = (
222 self.model_options.package_label,
223 self.model_options.object_name,
224 )
225 return model_unpickle, (class_id,), data
226
227 def __getstate__(self) -> dict[str, Any]:
228 """Hook to allow choosing the attributes to pickle."""
229 state = self.__dict__.copy()
230 state["_state"] = copy.copy(state["_state"])
231 state["_state"].fields_cache = state["_state"].fields_cache.copy()
232 # memoryview cannot be pickled, so cast it to bytes and store
233 # separately.
234 _memoryview_attrs = []
235 for attr, value in state.items():
236 if isinstance(value, memoryview):
237 _memoryview_attrs.append((attr, bytes(value)))
238 if _memoryview_attrs:
239 state["_memoryview_attrs"] = _memoryview_attrs
240 for attr, value in _memoryview_attrs:
241 state.pop(attr)
242 return state
243
244 def __setstate__(self, state: dict[str, Any]) -> None:
245 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
246 if pickled_version:
247 if pickled_version != plain.runtime.__version__:
248 warnings.warn(
249 f"Pickled model instance's Plain version {pickled_version} does not "
250 f"match the current version {plain.runtime.__version__}.",
251 RuntimeWarning,
252 stacklevel=2,
253 )
254 else:
255 warnings.warn(
256 "Pickled model instance's Plain version is not specified.",
257 RuntimeWarning,
258 stacklevel=2,
259 )
260 if "_memoryview_attrs" in state:
261 for attr, value in state.pop("_memoryview_attrs"):
262 state[attr] = memoryview(value)
263 self.__dict__.update(state)
264
265 def get_deferred_fields(self) -> set[str]:
266 """
267 Return a set containing names of deferred fields for this instance.
268 """
269 return {
270 f.attname
271 for f in self._model_meta.concrete_fields
272 if f.attname not in self.__dict__
273 }
274
275 def refresh_from_db(self, fields: list[str] | None = None) -> None:
276 """
277 Reload field values from the database.
278
279 Fields can be used to specify which fields to reload. If fields is
280 None, then all non-deferred fields are reloaded.
281
282 When accessing deferred fields of an instance, the deferred loading
283 of the field will call this method.
284 """
285 if fields is None:
286 self._prefetched_objects_cache = {}
287 else:
288 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", {})
289 for field in fields:
290 if field in prefetched_objects_cache:
291 del prefetched_objects_cache[field]
292 fields.remove(field)
293 if not fields:
294 return
295 if any(LOOKUP_SEP in f for f in fields):
296 raise ValueError(
297 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
298 "are not allowed in fields."
299 )
300
301 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
302
303 # Use provided fields, if not set then reload all non-deferred fields.
304 deferred_fields = self.get_deferred_fields()
305 if fields is not None:
306 fields = list(fields)
307 db_instance_qs = db_instance_qs.only(*fields)
308 elif deferred_fields:
309 fields = [
310 f.attname
311 for f in self._model_meta.concrete_fields
312 if f.attname not in deferred_fields
313 ]
314 db_instance_qs = db_instance_qs.only(*fields)
315
316 db_instance = db_instance_qs.get()
317 non_loaded_fields = db_instance.get_deferred_fields()
318 for field in self._model_meta.concrete_fields:
319 if field.attname in non_loaded_fields:
320 # This field wasn't refreshed - skip ahead.
321 continue
322 setattr(self, field.attname, getattr(db_instance, field.attname))
323 # Clear cached foreign keys.
324 if isinstance(field, RelatedField) and field.is_cached(self):
325 field.delete_cached_value(self)
326
327 # Clear cached relations.
328 for field in self._model_meta.related_objects:
329 if field.is_cached(self):
330 field.delete_cached_value(self)
331
332 def serializable_value(self, field_name: str) -> Any:
333 """
334 Return the value of the field name for this instance. If the field is
335 a foreign key, return the id value instead of the object. If there's
336 no Field object with this name on the model, return the model
337 attribute's value.
338
339 Used to serialize a field's value (in the serializer, or form output,
340 for example). Normally, you would just access the attribute directly
341 and not use this method.
342 """
343 try:
344 field = self._model_meta.get_forward_field(field_name)
345 except FieldDoesNotExist:
346 return getattr(self, field_name)
347 return getattr(self, field.attname)
348
349 def save(
350 self,
351 *,
352 clean_and_validate: bool = True,
353 force_insert: bool = False,
354 force_update: bool = False,
355 update_fields: Iterable[str] | None = None,
356 ) -> None:
357 """
358 Save the current instance. Override this in a subclass if you want to
359 control the saving process.
360
361 The 'force_insert' and 'force_update' parameters can be used to insist
362 that the "save" must be an SQL INSERT or UPDATE, respectively.
363 Normally, they should not be set.
364 """
365 self._prepare_related_fields_for_save(operation_name="save")
366
367 if force_insert and (force_update or update_fields):
368 raise ValueError("Cannot force both insert and updating in model saving.")
369
370 deferred_fields = self.get_deferred_fields()
371 if update_fields is not None:
372 # If update_fields is empty, skip the save. We do also check for
373 # no-op saves later on for inheritance cases. This bailout is
374 # still needed for skipping signal sending.
375 if not update_fields:
376 return
377
378 update_fields = frozenset(update_fields)
379 field_names = self._model_meta._non_pk_concrete_field_names
380 non_model_fields = update_fields.difference(field_names)
381
382 if non_model_fields:
383 raise ValueError(
384 "The following fields do not exist in this model, are m2m "
385 "fields, or are non-concrete fields: {}".format(
386 ", ".join(non_model_fields)
387 )
388 )
389
390 # If this model is deferred, automatically do an "update_fields" save
391 # on the loaded fields.
392 elif not force_insert and deferred_fields:
393 field_names = set()
394 for field in self._model_meta.concrete_fields:
395 if not field.primary_key and not hasattr(field, "through"):
396 field_names.add(field.attname)
397 loaded_fields = field_names.difference(deferred_fields)
398 if loaded_fields:
399 update_fields = frozenset(loaded_fields)
400
401 if clean_and_validate:
402 self.full_clean(exclude=deferred_fields)
403
404 self.save_base(
405 force_insert=force_insert,
406 force_update=force_update,
407 update_fields=update_fields,
408 )
409
410 def save_base(
411 self,
412 *,
413 raw: bool = False,
414 force_insert: bool = False,
415 force_update: bool = False,
416 update_fields: Iterable[str] | None = None,
417 ) -> None:
418 """
419 Handle the parts of saving which should be done only once per save,
420 yet need to be done in raw saves, too. This includes some sanity
421 checks and signal sending.
422
423 The 'raw' argument is telling save_base not to save any parent
424 models and not to do any changes to the values before save. This
425 is used by fixture loading.
426 """
427 assert not (force_insert and (force_update or update_fields))
428 assert update_fields is None or update_fields
429 cls = self.__class__
430
431 with transaction.mark_for_rollback_on_error():
432 self._save_table(
433 raw=raw,
434 cls=cls,
435 force_insert=force_insert,
436 force_update=force_update,
437 update_fields=update_fields,
438 )
439 # Once saved, this is no longer a to-be-added instance.
440 self._state.adding = False
441
442 def _save_table(
443 self,
444 *,
445 raw: bool,
446 cls: type[Model],
447 force_insert: bool = False,
448 force_update: bool = False,
449 update_fields: Iterable[str] | None = None,
450 ) -> bool:
451 """
452 Do the heavy-lifting involved in saving. Update or insert the data
453 for a single table.
454 """
455 meta = cls._model_meta
456 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
457
458 if update_fields:
459 non_pks = [
460 f
461 for f in non_pks
462 if f.name in update_fields or f.attname in update_fields
463 ]
464
465 id_val = self.id
466 if id_val is None:
467 id_field = meta.get_forward_field("id")
468 id_val = id_field.get_id_value_on_save(self)
469 setattr(self, id_field.attname, id_val)
470 id_set = id_val is not None
471 if not id_set and (force_update or update_fields):
472 raise ValueError("Cannot force an update in save() with no primary key.")
473 updated = False
474 # Skip an UPDATE when adding an instance and primary key has a default.
475 if (
476 not raw
477 and not force_insert
478 and self._state.adding
479 and meta.get_forward_field("id").default
480 and meta.get_forward_field("id").default is not NOT_PROVIDED
481 ):
482 force_insert = True
483 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
484 if id_set and not force_insert:
485 base_qs = meta.base_queryset
486 values = [
487 (
488 f,
489 None,
490 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
491 )
492 for f in non_pks
493 ]
494 forced_update = bool(update_fields or force_update)
495 updated = self._do_update(
496 base_qs, id_val, values, update_fields, forced_update
497 )
498 if force_update and not updated:
499 raise DatabaseError("Forced update did not affect any rows.")
500 if update_fields and not updated:
501 raise DatabaseError("Save with update_fields did not affect any rows.")
502 if not updated:
503 fields = meta.local_concrete_fields
504 if not id_set:
505 id_field = meta.get_forward_field("id")
506 fields = [f for f in fields if f is not id_field]
507
508 returning_fields = meta.db_returning_fields
509 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
510 if results:
511 for value, field in zip(results[0], returning_fields):
512 setattr(self, field.attname, value)
513 return updated
514
515 def _do_update(
516 self,
517 base_qs: QuerySet,
518 id_val: Any,
519 values: list[tuple[Any, Any, Any]],
520 update_fields: Iterable[str] | None,
521 forced_update: bool,
522 ) -> bool:
523 """
524 Try to update the model. Return True if the model was updated (if an
525 update query was done and a matching row was found in the DB).
526 """
527 filtered = base_qs.filter(id=id_val)
528 if not values:
529 # We can end up here when saving a model in inheritance chain where
530 # update_fields doesn't target any field in current model. In that
531 # case we just say the update succeeded. Another case ending up here
532 # is a model with just PK - in that case check that the PK still
533 # exists.
534 return update_fields is not None or filtered.exists()
535 return filtered._update(values) > 0
536
537 def _do_insert(
538 self,
539 manager: QuerySet,
540 fields: Sequence[Any],
541 returning_fields: Sequence[Any],
542 raw: bool,
543 ) -> list[tuple[Any, ...]] | None:
544 """
545 Do an INSERT. If returning_fields is defined then this method should
546 return the newly created data for the model.
547 """
548 return manager._insert(
549 [self],
550 fields=list(fields),
551 returning_fields=list(returning_fields) if returning_fields else None,
552 raw=raw,
553 )
554
555 def _prepare_related_fields_for_save(
556 self, operation_name: str, fields: Sequence[Any] | None = None
557 ) -> None:
558 # Ensure that a model instance without a PK hasn't been assigned to
559 # a ForeignKeyField on this model. If the field is nullable, allowing the save would result in silent data loss.
560 for field in self._model_meta.concrete_fields:
561 if fields and field not in fields:
562 continue
563 # If the related field isn't cached, then an instance hasn't been
564 # assigned and there's no need to worry about this check.
565 if isinstance(field, RelatedField) and field.is_cached(self):
566 obj = getattr(self, field.name, None)
567 if not obj:
568 continue
569 # A pk may have been assigned manually to a model instance not
570 # saved to the database (or auto-generated in a case like
571 # UUIDField), but we allow the save to proceed and rely on the
572 # database to raise an IntegrityError if applicable. If
573 # constraints aren't supported by the database, there's the
574 # unavoidable risk of data corruption.
575 if obj.id is None:
576 # Remove the object from a related instance cache.
577 if not field.remote_field.multiple:
578 field.remote_field.delete_cached_value(obj)
579 raise ValueError(
580 f"{operation_name}() prohibited to prevent data loss due to unsaved "
581 f"related object '{field.name}'."
582 )
583 elif getattr(self, field.attname) in field.empty_values:
584 # Set related object if it has been saved after an
585 # assignment.
586 setattr(self, field.name, obj)
587 # If the relationship's pk/to_field was changed, clear the
588 # cached relationship.
589 if getattr(obj, field.target_field.attname) != getattr(
590 self, field.attname
591 ):
592 field.delete_cached_value(self)
593
594 def delete(self) -> tuple[int, dict[str, int]]:
595 if self.id is None:
596 raise ValueError(
597 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
598 "to None."
599 )
600 collector = Collector(origin=self)
601 collector.collect([self])
602 return collector.delete()
603
604 def get_field_display(self, field_name: str) -> str:
605 """Get the display value for a field, especially useful for fields with choices."""
606 # Get the field object from the field name
607 field = self._model_meta.get_forward_field(field_name)
608 value = getattr(self, field.attname)
609
610 # If field has no choices, just return the value as string
611 if not hasattr(field, "flatchoices") or not field.flatchoices:
612 return force_str(value, strings_only=True)
613
614 # For fields with choices, look up the display value
615 choices_dict = dict(make_hashable(field.flatchoices))
616 return force_str(
617 choices_dict.get(make_hashable(value), value), strings_only=True
618 )
619
620 def _get_field_value_map(
621 self, meta: Meta | None, exclude: set[str] | None = None
622 ) -> dict[str, Value]:
623 if exclude is None:
624 exclude = set()
625 meta = meta or self._model_meta
626 return {
627 field.name: Value(getattr(self, field.attname), field)
628 for field in meta.local_concrete_fields
629 if field.name not in exclude
630 }
631
632 def prepare_database_save(self, field: Any) -> Any:
633 if self.id is None:
634 raise ValueError(
635 f"Unsaved model instance {self!r} cannot be used in an ORM query."
636 )
637 return getattr(self, field.remote_field.get_related_field().attname)
638
639 def clean(self) -> None:
640 """
641 Hook for doing any extra model-wide validation after clean() has been
642 called on every field by self.clean_fields. Any ValidationError raised
643 by this method will not be associated with a particular field; it will
644 have a special-case association with the field defined by NON_FIELD_ERRORS.
645 """
646 pass
647
648 def validate_unique(self, exclude: set[str] | None = None) -> None:
649 """
650 Check unique constraints on the model and raise ValidationError if any
651 failed.
652 """
653 unique_checks = self._get_unique_checks(exclude=exclude)
654
655 if errors := self._perform_unique_checks(unique_checks):
656 raise ValidationError(errors)
657
658 def _get_unique_checks(
659 self, exclude: set[str] | None = None
660 ) -> list[tuple[type[Model], tuple[str, ...]]]:
661 """
662 Return a list of checks to perform. Since validate_unique() could be
663 called from a ModelForm, some fields may have been excluded; we can't
664 perform a unique check on a model that is missing fields involved
665 in that check. Fields that did not validate should also be excluded,
666 but they need to be passed in via the exclude argument.
667 """
668 if exclude is None:
669 exclude = set()
670 unique_checks = []
671
672 # Gather a list of checks for fields declared as unique and add them to
673 # the list of checks.
674
675 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
676
677 for model_class, fields in fields_with_class:
678 for f in fields:
679 name = f.name
680 if name in exclude:
681 continue
682 if f.primary_key:
683 unique_checks.append((model_class, (name,)))
684
685 return unique_checks
686
687 def _perform_unique_checks(
688 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
689 ) -> dict[str, list[ValidationError]]:
690 errors = {}
691
692 for model_class, unique_check in unique_checks:
693 # Try to look up an existing object with the same values as this
694 # object's values for all the unique field.
695
696 lookup_kwargs = {}
697 for field_name in unique_check:
698 f = self._model_meta.get_forward_field(field_name)
699 lookup_value = getattr(self, f.attname)
700 if lookup_value is None:
701 # no value, skip the lookup
702 continue
703 if f.primary_key and not self._state.adding:
704 # no need to check for unique primary key when editing
705 continue
706 lookup_kwargs[str(field_name)] = lookup_value
707
708 # some fields were skipped, no reason to do the check
709 if len(unique_check) != len(lookup_kwargs):
710 continue
711
712 qs = model_class.query.filter(**lookup_kwargs)
713
714 # Exclude the current object from the query if we are editing an
715 # instance (as opposed to creating a new one)
716 # Use the primary key defined by model_class. In previous versions
717 # this could differ from `self.id` due to model inheritance.
718 model_class_id = getattr(self, "id")
719 if not self._state.adding and model_class_id is not None:
720 qs = qs.exclude(id=model_class_id)
721 if qs.exists():
722 if len(unique_check) == 1:
723 key = unique_check[0]
724 else:
725 key = NON_FIELD_ERRORS
726 errors.setdefault(key, []).append(
727 self.unique_error_message(model_class, unique_check)
728 )
729
730 return errors
731
732 def unique_error_message(
733 self, model_class: type[Model], unique_check: tuple[str, ...]
734 ) -> ValidationError:
735 meta = model_class._model_meta
736
737 params = {
738 "model": self,
739 "model_class": model_class,
740 "model_name": model_class.model_options.model_name,
741 "unique_check": unique_check,
742 }
743
744 if len(unique_check) == 1:
745 field = meta.get_forward_field(unique_check[0])
746 params["field_label"] = field.name
747 return ValidationError(
748 message=field.error_messages["unique"],
749 code="unique",
750 params=params,
751 )
752 else:
753 field_names = [meta.get_forward_field(f).name for f in unique_check]
754
755 # Put an "and" before the last one
756 field_names[-1] = f"and {field_names[-1]}"
757
758 if len(field_names) > 2:
759 # Comma join if more than 2
760 params["field_label"] = ", ".join(cast(list[str], field_names))
761 else:
762 # Just a space if there are only 2
763 params["field_label"] = " ".join(cast(list[str], field_names))
764
765 # Use the first field as the message format...
766 message = meta.get_forward_field(unique_check[0]).error_messages["unique"]
767
768 return ValidationError(
769 message=message,
770 code="unique",
771 params=params,
772 )
773
774 def get_constraints(self) -> list[tuple[type[Model], list[Any]]]:
775 constraints = [(self.__class__, list(self.model_options.constraints))]
776 return constraints
777
778 def validate_constraints(self, exclude: set[str] | None = None) -> None:
779 constraints = self.get_constraints()
780
781 errors = {}
782 for model_class, model_constraints in constraints:
783 for constraint in model_constraints:
784 try:
785 constraint.validate(model_class, self, exclude=exclude)
786 except ValidationError as e:
787 if (
788 getattr(e, "code", None) == "unique"
789 and len(constraint.fields) == 1
790 ):
791 errors.setdefault(constraint.fields[0], []).append(e)
792 else:
793 errors = e.update_error_dict(errors)
794 if errors:
795 raise ValidationError(errors)
796
797 def full_clean(
798 self,
799 *,
800 exclude: set[str] | Iterable[str] | None = None,
801 validate_unique: bool = True,
802 validate_constraints: bool = True,
803 ) -> None:
804 """
805 Call clean_fields(), clean(), validate_unique(), and
806 validate_constraints() on the model. Raise a ValidationError for any
807 errors that occur.
808 """
809 errors = {}
810 if exclude is None:
811 exclude = set()
812 else:
813 exclude = set(exclude)
814
815 try:
816 self.clean_fields(exclude=exclude)
817 except ValidationError as e:
818 errors = e.update_error_dict(errors)
819
820 # Form.clean() is run even if other validation fails, so do the
821 # same with Model.clean() for consistency.
822 try:
823 self.clean()
824 except ValidationError as e:
825 errors = e.update_error_dict(errors)
826
827 # Run unique checks, but only for fields that passed validation.
828 if validate_unique:
829 for name in errors:
830 if name != NON_FIELD_ERRORS and name not in exclude:
831 exclude.add(name)
832 try:
833 self.validate_unique(exclude=exclude)
834 except ValidationError as e:
835 errors = e.update_error_dict(errors)
836
837 # Run constraints checks, but only for fields that passed validation.
838 if validate_constraints:
839 for name in errors:
840 if name != NON_FIELD_ERRORS and name not in exclude:
841 exclude.add(name)
842 try:
843 self.validate_constraints(exclude=exclude)
844 except ValidationError as e:
845 errors = e.update_error_dict(errors)
846
847 if errors:
848 raise ValidationError(errors)
849
850 def clean_fields(self, exclude: set[str] | None = None) -> None:
851 """
852 Clean all fields and raise a ValidationError containing a dict
853 of all validation errors if any occur.
854 """
855 if exclude is None:
856 exclude = set()
857
858 errors = {}
859 for f in self._model_meta.fields:
860 if f.name in exclude:
861 continue
862 # Skip validation for empty fields with required=False. The developer
863 # is responsible for making sure they have a valid value.
864 raw_value = getattr(self, f.attname)
865 if not f.required and raw_value in f.empty_values:
866 continue
867 try:
868 setattr(self, f.attname, f.clean(raw_value, self))
869 except ValidationError as e:
870 errors[f.name] = e.error_list
871
872 if errors:
873 raise ValidationError(errors)
874
875 @classmethod
876 def preflight(cls) -> list[PreflightResult]:
877 errors: list[PreflightResult] = []
878
879 errors += [
880 *cls._check_fields(),
881 *cls._check_m2m_through_same_relationship(),
882 *cls._check_long_column_names(),
883 ]
884 clash_errors = (
885 *cls._check_id_field(),
886 *cls._check_field_name_clashes(),
887 *cls._check_model_name_db_lookup_clashes(),
888 *cls._check_property_name_related_field_accessor_clashes(),
889 *cls._check_single_primary_key(),
890 )
891 errors.extend(clash_errors)
892 # If there are field name clashes, hide consequent column name
893 # clashes.
894 if not clash_errors:
895 errors.extend(cls._check_column_name_clashes())
896 errors += [
897 *cls._check_indexes(),
898 *cls._check_ordering(),
899 *cls._check_constraints(),
900 ]
901
902 return errors
903
904 @classmethod
905 def _check_fields(cls) -> list[PreflightResult]:
906 """Perform all field checks."""
907 errors: list[PreflightResult] = []
908 for field in cls._model_meta.local_fields:
909 errors.extend(field.preflight(from_model=cls))
910 for field in cls._model_meta.local_many_to_many:
911 errors.extend(field.preflight(from_model=cls))
912 return errors
913
914 @classmethod
915 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
916 """Check if no relationship model is used by more than one m2m field."""
917
918 errors: list[PreflightResult] = []
919 seen_intermediary_signatures = []
920
921 fields = cls._model_meta.local_many_to_many
922
923 # Skip when the target model wasn't found.
924 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
925
926 # Skip when the relationship model wasn't found.
927 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
928
929 for f in fields:
930 signature = (
931 f.remote_field.model,
932 cls,
933 f.remote_field.through,
934 f.remote_field.through_fields,
935 )
936 if signature in seen_intermediary_signatures:
937 errors.append(
938 PreflightResult(
939 fix="The model has two identical many-to-many relations "
940 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
941 obj=cls,
942 id="models.duplicate_many_to_many_relations",
943 )
944 )
945 else:
946 seen_intermediary_signatures.append(signature)
947 return errors
948
949 @classmethod
950 def _check_id_field(cls) -> list[PreflightResult]:
951 """Disallow user-defined fields named ``id``."""
952 if any(
953 f
954 for f in cls._model_meta.local_fields
955 if f.name == "id" and not f.auto_created
956 ):
957 return [
958 PreflightResult(
959 fix="'id' is a reserved word that cannot be used as a field name.",
960 obj=cls,
961 id="models.reserved_field_name_id",
962 )
963 ]
964 return []
965
966 @classmethod
967 def _check_field_name_clashes(cls) -> list[PreflightResult]:
968 """Forbid field shadowing in multi-table inheritance."""
969 errors: list[PreflightResult] = []
970 used_fields = {} # name or attname -> field
971
972 for f in cls._model_meta.local_fields:
973 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
974 # Note that we may detect clash between user-defined non-unique
975 # field "id" and automatically added unique field "id", both
976 # defined at the same model. This special case is considered in
977 # _check_id_field and here we ignore it.
978 id_conflict = (
979 f.name == "id" and clash and clash.name == "id" and clash.model == cls
980 )
981 if clash and not id_conflict:
982 errors.append(
983 PreflightResult(
984 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
985 f"from model '{clash.model.model_options}'.",
986 obj=f,
987 id="models.field_name_clash",
988 )
989 )
990 used_fields[f.name] = f
991 used_fields[f.attname] = f
992
993 return errors
994
995 @classmethod
996 def _check_column_name_clashes(cls) -> list[PreflightResult]:
997 # Store a list of column names which have already been used by other fields.
998 used_column_names: list[str] = []
999 errors: list[PreflightResult] = []
1000
1001 for f in cls._model_meta.local_fields:
1002 column_name = f.column
1003
1004 # Ensure the column name is not already in use.
1005 if column_name and column_name in used_column_names:
1006 errors.append(
1007 PreflightResult(
1008 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1009 "another field.",
1010 obj=cls,
1011 id="models.db_column_clash",
1012 )
1013 )
1014 else:
1015 used_column_names.append(column_name)
1016
1017 return errors
1018
1019 @classmethod
1020 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1021 errors: list[PreflightResult] = []
1022 model_name = cls.__name__
1023 if model_name.startswith("_") or model_name.endswith("_"):
1024 errors.append(
1025 PreflightResult(
1026 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1027 "as it collides with the query lookup syntax.",
1028 obj=cls,
1029 id="models.model_name_underscore_bounds",
1030 )
1031 )
1032 elif LOOKUP_SEP in model_name:
1033 errors.append(
1034 PreflightResult(
1035 fix=f"The model name '{model_name}' cannot contain double underscores as "
1036 "it collides with the query lookup syntax.",
1037 obj=cls,
1038 id="models.model_name_double_underscore",
1039 )
1040 )
1041 return errors
1042
1043 @classmethod
1044 def _check_property_name_related_field_accessor_clashes(
1045 cls,
1046 ) -> list[PreflightResult]:
1047 errors: list[PreflightResult] = []
1048 property_names = cls._model_meta._property_names
1049 related_field_accessors = (
1050 f.get_attname()
1051 for f in cls._model_meta._get_fields(reverse=False)
1052 if isinstance(f, RelatedField)
1053 )
1054 for accessor in related_field_accessors:
1055 if accessor in property_names:
1056 errors.append(
1057 PreflightResult(
1058 fix=f"The property '{accessor}' clashes with a related field "
1059 "accessor.",
1060 obj=cls,
1061 id="models.property_related_field_clash",
1062 )
1063 )
1064 return errors
1065
1066 @classmethod
1067 def _check_single_primary_key(cls) -> list[PreflightResult]:
1068 errors: list[PreflightResult] = []
1069 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1070 errors.append(
1071 PreflightResult(
1072 fix="The model cannot have more than one field with "
1073 "'primary_key=True'.",
1074 obj=cls,
1075 id="models.multiple_primary_keys",
1076 )
1077 )
1078 return errors
1079
1080 @classmethod
1081 def _check_indexes(cls) -> list[PreflightResult]:
1082 """Check fields, names, and conditions of indexes."""
1083 errors: list[PreflightResult] = []
1084 references: set[str] = set()
1085 for index in cls.model_options.indexes:
1086 # Index name can't start with an underscore or a number
1087 if index.name[0] == "_" or index.name[0].isdigit():
1088 errors.append(
1089 PreflightResult(
1090 fix=f"The index name '{index.name}' cannot start with an underscore "
1091 "or a number.",
1092 obj=cls,
1093 id="models.index_name_invalid_start",
1094 ),
1095 )
1096 if len(index.name) > index.max_name_length:
1097 errors.append(
1098 PreflightResult(
1099 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1100 "characters." % (index.name, index.max_name_length),
1101 obj=cls,
1102 id="models.index_name_too_long",
1103 ),
1104 )
1105 if index.contains_expressions:
1106 for expression in index.expressions:
1107 references.update(
1108 ref[0] for ref in cls._get_expr_references(expression)
1109 )
1110 # Check fields referenced in indexes
1111 fields = [
1112 field
1113 for index in cls.model_options.indexes
1114 for field, _ in index.fields_orders
1115 ]
1116 fields += [
1117 include for index in cls.model_options.indexes for include in index.include
1118 ]
1119 fields += references
1120 errors.extend(cls._check_local_fields(fields, "indexes"))
1121 return errors
1122
1123 @classmethod
1124 def _check_local_fields(
1125 cls, fields: Iterable[str], option: str
1126 ) -> list[PreflightResult]:
1127 # In order to avoid hitting the relation tree prematurely, we use our
1128 # own fields_map instead of using get_field()
1129 forward_fields_map: dict[str, Field] = {}
1130 for field in cls._model_meta._get_fields(reverse=False):
1131 forward_fields_map[field.name] = field
1132 if hasattr(field, "attname"):
1133 forward_fields_map[field.attname] = field
1134
1135 errors: list[PreflightResult] = []
1136 for field_name in fields:
1137 try:
1138 field = forward_fields_map[field_name]
1139 except KeyError:
1140 errors.append(
1141 PreflightResult(
1142 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1143 obj=cls,
1144 id="models.nonexistent_field_reference",
1145 )
1146 )
1147 else:
1148 from plain.models.fields.related import ManyToManyField
1149
1150 if isinstance(field, ManyToManyField):
1151 errors.append(
1152 PreflightResult(
1153 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1154 f"ManyToManyFields are not permitted in '{option}'.",
1155 obj=cls,
1156 id="models.m2m_field_in_meta_option",
1157 )
1158 )
1159 elif field not in cls._model_meta.local_fields:
1160 errors.append(
1161 PreflightResult(
1162 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1163 f"'{cls.model_options.object_name}'. This issue may be caused by multi-table inheritance.",
1164 obj=cls,
1165 id="models.non_local_field_reference",
1166 )
1167 )
1168 return errors
1169
1170 @classmethod
1171 def _check_ordering(cls) -> list[PreflightResult]:
1172 """
1173 Check "ordering" option -- is it a list of strings and do all fields
1174 exist?
1175 """
1176
1177 if not cls.model_options.ordering:
1178 return []
1179
1180 if not isinstance(cls.model_options.ordering, list | tuple):
1181 return [
1182 PreflightResult(
1183 fix="'ordering' must be a tuple or list (even if you want to order by "
1184 "only one field).",
1185 obj=cls,
1186 id="models.ordering_not_tuple_or_list",
1187 )
1188 ]
1189
1190 errors: list[PreflightResult] = []
1191 fields = cls.model_options.ordering
1192
1193 # Skip expressions and '?' fields.
1194 fields = (f for f in fields if isinstance(f, str) and f != "?")
1195
1196 # Convert "-field" to "field".
1197 fields = (f.removeprefix("-") for f in fields)
1198
1199 # Separate related fields and non-related fields.
1200 _fields = []
1201 related_fields = []
1202 for f in fields:
1203 if LOOKUP_SEP in f:
1204 related_fields.append(f)
1205 else:
1206 _fields.append(f)
1207 fields = _fields
1208
1209 # Check related fields.
1210 for field in related_fields:
1211 _cls = cls
1212 fld = None
1213 for part in field.split(LOOKUP_SEP):
1214 try:
1215 fld = _cls._model_meta.get_field(part)
1216 if isinstance(fld, RelatedField):
1217 _cls = fld.path_infos[-1].to_meta.model
1218 else:
1219 _cls = None
1220 except (FieldDoesNotExist, AttributeError):
1221 if fld is None or (
1222 not isinstance(fld, Field)
1223 or (
1224 fld.get_transform(part) is None
1225 and fld.get_lookup(part) is None
1226 )
1227 ):
1228 errors.append(
1229 PreflightResult(
1230 fix="'ordering' refers to the nonexistent field, "
1231 f"related field, or lookup '{field}'.",
1232 obj=cls,
1233 id="models.ordering_nonexistent_field",
1234 )
1235 )
1236
1237 # Check for invalid or nonexistent fields in ordering.
1238 invalid_fields = []
1239
1240 # Any field name that is not present in field_names does not exist.
1241 # Also, ordering by m2m fields is not allowed.
1242 meta = cls._model_meta
1243 valid_fields = set(
1244 chain.from_iterable(
1245 (f.name, f.attname)
1246 if not (f.auto_created and not f.concrete)
1247 else (f.field.related_query_name(),)
1248 for f in chain(meta.fields, meta.related_objects)
1249 )
1250 )
1251
1252 invalid_fields.extend(set(fields) - valid_fields)
1253
1254 for invalid_field in invalid_fields:
1255 errors.append(
1256 PreflightResult(
1257 fix="'ordering' refers to the nonexistent field, related "
1258 f"field, or lookup '{invalid_field}'.",
1259 obj=cls,
1260 id="models.ordering_nonexistent_field",
1261 )
1262 )
1263 return errors
1264
1265 @classmethod
1266 def _check_long_column_names(cls) -> list[PreflightResult]:
1267 """
1268 Check that any auto-generated column names are shorter than the limits
1269 for each database in which the model will be created.
1270 """
1271 errors: list[PreflightResult] = []
1272
1273 # PostgreSQL has a 63-character limit on identifier names and doesn't
1274 # silently truncate, so we check for names that are too long
1275 allowed_len = MAX_NAME_LENGTH
1276
1277 for f in cls._model_meta.local_fields:
1278 column_name = f.column
1279
1280 # Check if column name is too long for the database.
1281 if column_name is not None and len(column_name) > allowed_len:
1282 errors.append(
1283 PreflightResult(
1284 fix=f'Column name too long for field "{column_name}". '
1285 f'Maximum length is "{allowed_len}" for the database.',
1286 obj=cls,
1287 id="models.column_name_too_long",
1288 )
1289 )
1290
1291 for f in cls._model_meta.local_many_to_many:
1292 # Skip nonexistent models.
1293 if isinstance(f.remote_field.through, str):
1294 continue
1295
1296 # Check if column name for the M2M field is too long for the database.
1297 for m2m in f.remote_field.through._model_meta.local_fields:
1298 rel_name = m2m.column
1299 if rel_name is not None and len(rel_name) > allowed_len:
1300 errors.append(
1301 PreflightResult(
1302 fix="Column name too long for M2M field "
1303 f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1304 obj=cls,
1305 id="models.m2m_column_name_too_long",
1306 )
1307 )
1308
1309 return errors
1310
1311 @classmethod
1312 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1313 if isinstance(expr, Q):
1314 for child in expr.children:
1315 if isinstance(child, tuple):
1316 lookup, value = child
1317 yield tuple(lookup.split(LOOKUP_SEP))
1318 yield from cls._get_expr_references(value)
1319 else:
1320 yield from cls._get_expr_references(child)
1321 elif isinstance(expr, F):
1322 yield tuple(expr.name.split(LOOKUP_SEP))
1323 elif hasattr(expr, "get_source_expressions"):
1324 for src_expr in expr.get_source_expressions():
1325 yield from cls._get_expr_references(src_expr)
1326
1327 @classmethod
1328 def _check_constraints(cls) -> list[PreflightResult]:
1329 errors: list[PreflightResult] = []
1330 fields = set(
1331 chain.from_iterable(
1332 (*constraint.fields, *constraint.include)
1333 for constraint in cls.model_options.constraints
1334 if isinstance(constraint, UniqueConstraint)
1335 )
1336 )
1337 references = set()
1338 for constraint in cls.model_options.constraints:
1339 if isinstance(constraint, UniqueConstraint):
1340 if isinstance(constraint.condition, Q):
1341 references.update(cls._get_expr_references(constraint.condition))
1342 if constraint.contains_expressions:
1343 for expression in constraint.expressions:
1344 references.update(cls._get_expr_references(expression))
1345 elif isinstance(constraint, CheckConstraint):
1346 if isinstance(constraint.check, Q):
1347 references.update(cls._get_expr_references(constraint.check))
1348 if any(isinstance(expr, RawSQL) for expr in constraint.check.flatten()):
1349 errors.append(
1350 PreflightResult(
1351 fix=f"Check constraint {constraint.name!r} contains "
1352 f"RawSQL() expression and won't be validated "
1353 f"during the model full_clean(). "
1354 "Silence this warning if you don't care about it.",
1355 warning=True,
1356 obj=cls,
1357 id="models.constraint_name_collision_autogenerated",
1358 ),
1359 )
1360 for field_name, *lookups in references:
1361 fields.add(field_name)
1362 if not lookups:
1363 # If it has no lookups it cannot result in a JOIN.
1364 continue
1365 try:
1366 field = cls._model_meta.get_field(field_name)
1367 from plain.models.fields.related import ManyToManyField
1368 from plain.models.fields.reverse_related import ForeignKeyRel
1369
1370 if (
1371 not isinstance(field, RelatedField)
1372 or isinstance(field, ManyToManyField)
1373 or isinstance(field, ForeignKeyRel)
1374 ):
1375 continue
1376 except FieldDoesNotExist:
1377 continue
1378 # JOIN must happen at the first lookup.
1379 first_lookup = lookups[0]
1380 if (
1381 hasattr(field, "get_transform")
1382 and hasattr(field, "get_lookup")
1383 and field.get_transform(first_lookup) is None
1384 and field.get_lookup(first_lookup) is None
1385 ):
1386 errors.append(
1387 PreflightResult(
1388 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1389 obj=cls,
1390 id="models.constraint_refers_to_joined_field",
1391 )
1392 )
1393 errors.extend(cls._check_local_fields(fields, "constraints"))
1394 return errors
1395
1396
1397########
1398# MISC #
1399########
1400
1401
1402def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1403 """Used to unpickle Model subclasses with deferred fields."""
1404 if isinstance(model_id, tuple):
1405 model = models_registry.get_model(*model_id)
1406 else:
1407 # Backwards compat - the model was cached directly in earlier versions.
1408 model = model_id
1409 return model.__new__(model)
1410
1411
1412# Pickle protocol marker - functions don't normally have this attribute
1413model_unpickle.__safe_for_unpickle__ = True # type: ignore[attr-defined]