1import copy
2import inspect
3import warnings
4from itertools import chain
5
6import plain.runtime
7from plain import preflight
8from plain.exceptions import (
9 NON_FIELD_ERRORS,
10 FieldDoesNotExist,
11 MultipleObjectsReturned,
12 ObjectDoesNotExist,
13 ValidationError,
14)
15from plain.models import models_registry, transaction
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.expressions import RawSQL, Value
25from plain.models.fields import NOT_PROVIDED
26from plain.models.fields.reverse_related import ForeignObjectRel
27from plain.models.options import Options
28from plain.models.query import F, Q, QuerySet
29from plain.packages import packages_registry
30from plain.utils.encoding import force_str
31from plain.utils.hashable import make_hashable
32
33
34class Deferred:
35 def __repr__(self):
36 return "<Deferred field>"
37
38 def __str__(self):
39 return "<Deferred field>"
40
41
42DEFERRED = Deferred()
43
44
45def _has_contribute_to_class(value):
46 # Only call contribute_to_class() if it's bound.
47 return not inspect.isclass(value) and hasattr(value, "contribute_to_class")
48
49
50class ModelBase(type):
51 """Metaclass for all models."""
52
53 def __new__(cls, name, bases, attrs, **kwargs):
54 # Don't do any of this for the root models.Model class.
55 if not bases:
56 return super().__new__(cls, name, bases, attrs)
57
58 for base in bases:
59 # Models are required to directly inherit from model.Model, not a subclass of it.
60 if issubclass(base, Model) and base is not Model:
61 raise TypeError(
62 f"A model can't extend another model: {name} extends {base}"
63 )
64 # Meta has to be defined on the model itself.
65 if hasattr(base, "Meta"):
66 raise TypeError(
67 "Meta can only be defined on a model itself, not a parent class: "
68 f"{name} extends {base}"
69 )
70
71 new_class = super().__new__(cls, name, bases, attrs, **kwargs)
72
73 new_class._setup_meta()
74 new_class._add_exceptions()
75
76 # Now go back over all the attrs on this class see if they have a contribute_to_class() method.
77 # Attributes with contribute_to_class are fields and meta options.
78 for attr_name, attr_value in inspect.getmembers(new_class):
79 if attr_name.startswith("_"):
80 continue
81
82 if _has_contribute_to_class(attr_value):
83 if attr_name not in attrs:
84 # If the field came from an inherited class/mixin,
85 # we need to make a copy of it to avoid altering the
86 # original class and other classes that inherit from it.
87 field = copy.deepcopy(attr_value)
88 else:
89 field = attr_value
90 new_class.add_to_class(attr_name, field)
91
92 new_class._meta.concrete_model = new_class
93
94 # Copy indexes so that index names are unique when models extend another class.
95 new_class._meta.indexes = [
96 copy.deepcopy(idx) for idx in new_class._meta.indexes
97 ]
98
99 new_class._prepare()
100
101 return new_class
102
103 def add_to_class(cls, name, value):
104 if _has_contribute_to_class(value):
105 value.contribute_to_class(cls, name)
106 else:
107 setattr(cls, name, value)
108
109 def _setup_meta(cls):
110 name = cls.__name__
111 module = cls.__module__
112
113 # The model's Meta class, if it has one.
114 meta = getattr(cls, "Meta", None)
115
116 # Look for an application configuration to attach the model to.
117 package_config = packages_registry.get_containing_package_config(module)
118
119 package_label = getattr(meta, "package_label", None)
120 if package_label is None:
121 if package_config is None:
122 raise RuntimeError(
123 f"Model class {module}.{name} doesn't declare an explicit "
124 "package_label and isn't in an application in "
125 "INSTALLED_PACKAGES."
126 )
127 else:
128 package_label = package_config.package_label
129
130 cls.add_to_class("_meta", Options(meta, package_label))
131
132 def _add_exceptions(cls):
133 cls.DoesNotExist = type(
134 "DoesNotExist",
135 (ObjectDoesNotExist,),
136 {
137 "__module__": cls.__module__,
138 "__qualname__": f"{cls.__qualname__}.DoesNotExist",
139 },
140 )
141
142 cls.MultipleObjectsReturned = type(
143 "MultipleObjectsReturned",
144 (MultipleObjectsReturned,),
145 {
146 "__module__": cls.__module__,
147 "__qualname__": f"{cls.__qualname__}.MultipleObjectsReturned",
148 },
149 )
150
151 def _prepare(cls):
152 """Create some methods once self._meta has been populated."""
153 opts = cls._meta
154 opts._prepare(cls)
155
156 # Give the class a docstring -- its definition.
157 if cls.__doc__ is None:
158 cls.__doc__ = "{}({})".format(
159 cls.__name__,
160 ", ".join(f.name for f in opts.fields),
161 )
162
163 # Set the name of _meta.indexes. This can't be done in
164 # Options.contribute_to_class() because fields haven't been added to
165 # the model at that point.
166 for index in cls._meta.indexes:
167 if not index.name:
168 index.set_name_with_model(cls)
169
170 @property
171 def query(cls) -> QuerySet:
172 """Create a new QuerySet for this model."""
173 return cls._meta.queryset
174
175
176class ModelStateFieldsCacheDescriptor:
177 def __get__(self, instance, cls=None):
178 if instance is None:
179 return self
180 res = instance.fields_cache = {}
181 return res
182
183
184class ModelState:
185 """Store model instance state."""
186
187 # If true, uniqueness validation checks will consider this a new, unsaved
188 # object. Necessary for correct validation of new instances of objects with
189 # explicit (non-auto) PKs. This impacts validation only; it has no effect
190 # on the actual save.
191 adding = True
192 fields_cache = ModelStateFieldsCacheDescriptor()
193
194
195class Model(metaclass=ModelBase):
196 DoesNotExist: type[ObjectDoesNotExist]
197 MultipleObjectsReturned: type[MultipleObjectsReturned]
198
199 def __init__(self, *args, **kwargs):
200 # Alias some things as locals to avoid repeat global lookups
201 cls = self.__class__
202 opts = self._meta
203 _setattr = setattr
204 _DEFERRED = DEFERRED
205
206 # Set up the storage for instance state
207 self._state = ModelState()
208
209 # There is a rather weird disparity here; if kwargs, it's set, then args
210 # overrides it. It should be one or the other; don't duplicate the work
211 # The reason for the kwargs check is that standard iterator passes in by
212 # args, and instantiation for iteration is 33% faster.
213 if len(args) > len(opts.concrete_fields):
214 # Daft, but matches old exception sans the err msg.
215 raise IndexError("Number of args exceeds number of fields")
216
217 if not kwargs:
218 fields_iter = iter(opts.concrete_fields)
219 # The ordering of the zip calls matter - zip throws StopIteration
220 # when an iter throws it. So if the first iter throws it, the second
221 # is *not* consumed. We rely on this, so don't change the order
222 # without changing the logic.
223 for val, field in zip(args, fields_iter):
224 if val is _DEFERRED:
225 continue
226 _setattr(self, field.attname, val)
227 else:
228 # Slower, kwargs-ready version.
229 fields_iter = iter(opts.fields)
230 for val, field in zip(args, fields_iter):
231 if val is _DEFERRED:
232 continue
233 _setattr(self, field.attname, val)
234 if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
235 raise TypeError(
236 f"{cls.__qualname__}() got both positional and "
237 f"keyword arguments for field '{field.name}'."
238 )
239
240 # Now we're left with the unprocessed fields that *must* come from
241 # keywords, or default.
242
243 for field in fields_iter:
244 is_related_object = False
245 # Virtual field
246 if field.attname not in kwargs and field.column is None:
247 continue
248 if kwargs:
249 if isinstance(field.remote_field, ForeignObjectRel):
250 try:
251 # Assume object instance was passed in.
252 rel_obj = kwargs.pop(field.name)
253 is_related_object = True
254 except KeyError:
255 try:
256 # Object instance wasn't passed in -- must be an ID.
257 val = kwargs.pop(field.attname)
258 except KeyError:
259 val = field.get_default()
260 else:
261 try:
262 val = kwargs.pop(field.attname)
263 except KeyError:
264 # This is done with an exception rather than the
265 # default argument on pop because we don't want
266 # get_default() to be evaluated, and then not used.
267 # Refs #12057.
268 val = field.get_default()
269 else:
270 val = field.get_default()
271
272 if is_related_object:
273 # If we are passed a related instance, set it using the
274 # field.name instead of field.attname (e.g. "user" instead of
275 # "user_id") so that the object gets properly cached (and type
276 # checked) by the RelatedObjectDescriptor.
277 if rel_obj is not _DEFERRED:
278 _setattr(self, field.name, rel_obj)
279 else:
280 if val is not _DEFERRED:
281 _setattr(self, field.attname, val)
282
283 if kwargs:
284 property_names = opts._property_names
285 unexpected = ()
286 for prop, value in kwargs.items():
287 # Any remaining kwargs must correspond to properties or virtual
288 # fields.
289 if prop in property_names:
290 if value is not _DEFERRED:
291 _setattr(self, prop, value)
292 else:
293 try:
294 opts.get_field(prop)
295 except FieldDoesNotExist:
296 unexpected += (prop,)
297 else:
298 if value is not _DEFERRED:
299 _setattr(self, prop, value)
300 if unexpected:
301 unexpected_names = ", ".join(repr(n) for n in unexpected)
302 raise TypeError(
303 f"{cls.__name__}() got unexpected keyword arguments: "
304 f"{unexpected_names}"
305 )
306 super().__init__()
307
308 @classmethod
309 def from_db(cls, field_names, values):
310 if len(values) != len(cls._meta.concrete_fields):
311 values_iter = iter(values)
312 values = [
313 next(values_iter) if f.attname in field_names else DEFERRED
314 for f in cls._meta.concrete_fields
315 ]
316 new = cls(*values)
317 new._state.adding = False
318 return new
319
320 def __repr__(self):
321 return f"<{self.__class__.__name__}: {self}>"
322
323 def __str__(self):
324 return f"{self.__class__.__name__} object ({self.id})"
325
326 def __eq__(self, other):
327 if not isinstance(other, Model):
328 return NotImplemented
329 if self._meta.concrete_model != other._meta.concrete_model:
330 return False
331 my_id = self.id
332 if my_id is None:
333 return self is other
334 return my_id == other.id
335
336 def __hash__(self):
337 if self.id is None:
338 raise TypeError("Model instances without primary key value are unhashable")
339 return hash(self.id)
340
341 def __reduce__(self):
342 data = self.__getstate__()
343 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
344 class_id = self._meta.package_label, self._meta.object_name
345 return model_unpickle, (class_id,), data
346
347 def __getstate__(self):
348 """Hook to allow choosing the attributes to pickle."""
349 state = self.__dict__.copy()
350 state["_state"] = copy.copy(state["_state"])
351 state["_state"].fields_cache = state["_state"].fields_cache.copy()
352 # memoryview cannot be pickled, so cast it to bytes and store
353 # separately.
354 _memoryview_attrs = []
355 for attr, value in state.items():
356 if isinstance(value, memoryview):
357 _memoryview_attrs.append((attr, bytes(value)))
358 if _memoryview_attrs:
359 state["_memoryview_attrs"] = _memoryview_attrs
360 for attr, value in _memoryview_attrs:
361 state.pop(attr)
362 return state
363
364 def __setstate__(self, state):
365 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
366 if pickled_version:
367 if pickled_version != plain.runtime.__version__:
368 warnings.warn(
369 f"Pickled model instance's Plain version {pickled_version} does not "
370 f"match the current version {plain.runtime.__version__}.",
371 RuntimeWarning,
372 stacklevel=2,
373 )
374 else:
375 warnings.warn(
376 "Pickled model instance's Plain version is not specified.",
377 RuntimeWarning,
378 stacklevel=2,
379 )
380 if "_memoryview_attrs" in state:
381 for attr, value in state.pop("_memoryview_attrs"):
382 state[attr] = memoryview(value)
383 self.__dict__.update(state)
384
385 def get_deferred_fields(self):
386 """
387 Return a set containing names of deferred fields for this instance.
388 """
389 return {
390 f.attname
391 for f in self._meta.concrete_fields
392 if f.attname not in self.__dict__
393 }
394
395 def refresh_from_db(self, fields=None):
396 """
397 Reload field values from the database.
398
399 By default, the reloading happens from the database this instance was
400 loaded from, or by the read router if this instance wasn't loaded from
401 any database. The using parameter will override the default.
402
403 Fields can be used to specify which fields to reload. The fields
404 should be an iterable of field attnames. If fields is None, then
405 all non-deferred fields are reloaded.
406
407 When accessing deferred fields of an instance, the deferred loading
408 of the field will call this method.
409 """
410 if fields is None:
411 self._prefetched_objects_cache = {}
412 else:
413 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
414 for field in fields:
415 if field in prefetched_objects_cache:
416 del prefetched_objects_cache[field]
417 fields.remove(field)
418 if not fields:
419 return
420 if any(LOOKUP_SEP in f for f in fields):
421 raise ValueError(
422 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
423 "are not allowed in fields."
424 )
425
426 db_instance_qs = self.__class__._meta.base_queryset.filter(id=self.id)
427
428 # Use provided fields, if not set then reload all non-deferred fields.
429 deferred_fields = self.get_deferred_fields()
430 if fields is not None:
431 fields = list(fields)
432 db_instance_qs = db_instance_qs.only(*fields)
433 elif deferred_fields:
434 fields = [
435 f.attname
436 for f in self._meta.concrete_fields
437 if f.attname not in deferred_fields
438 ]
439 db_instance_qs = db_instance_qs.only(*fields)
440
441 db_instance = db_instance_qs.get()
442 non_loaded_fields = db_instance.get_deferred_fields()
443 for field in self._meta.concrete_fields:
444 if field.attname in non_loaded_fields:
445 # This field wasn't refreshed - skip ahead.
446 continue
447 setattr(self, field.attname, getattr(db_instance, field.attname))
448 # Clear cached foreign keys.
449 if field.is_relation and field.is_cached(self):
450 field.delete_cached_value(self)
451
452 # Clear cached relations.
453 for field in self._meta.related_objects:
454 if field.is_cached(self):
455 field.delete_cached_value(self)
456
457 def serializable_value(self, field_name):
458 """
459 Return the value of the field name for this instance. If the field is
460 a foreign key, return the id value instead of the object. If there's
461 no Field object with this name on the model, return the model
462 attribute's value.
463
464 Used to serialize a field's value (in the serializer, or form output,
465 for example). Normally, you would just access the attribute directly
466 and not use this method.
467 """
468 try:
469 field = self._meta.get_field(field_name)
470 except FieldDoesNotExist:
471 return getattr(self, field_name)
472 return getattr(self, field.attname)
473
474 def save(
475 self,
476 *,
477 clean_and_validate=True,
478 force_insert=False,
479 force_update=False,
480 update_fields=None,
481 ):
482 """
483 Save the current instance. Override this in a subclass if you want to
484 control the saving process.
485
486 The 'force_insert' and 'force_update' parameters can be used to insist
487 that the "save" must be an SQL insert or update (or equivalent for
488 non-SQL backends), respectively. Normally, they should not be set.
489 """
490 self._prepare_related_fields_for_save(operation_name="save")
491
492 if force_insert and (force_update or update_fields):
493 raise ValueError("Cannot force both insert and updating in model saving.")
494
495 deferred_fields = self.get_deferred_fields()
496 if update_fields is not None:
497 # If update_fields is empty, skip the save. We do also check for
498 # no-op saves later on for inheritance cases. This bailout is
499 # still needed for skipping signal sending.
500 if not update_fields:
501 return
502
503 update_fields = frozenset(update_fields)
504 field_names = self._meta._non_pk_concrete_field_names
505 non_model_fields = update_fields.difference(field_names)
506
507 if non_model_fields:
508 raise ValueError(
509 "The following fields do not exist in this model, are m2m "
510 "fields, or are non-concrete fields: {}".format(
511 ", ".join(non_model_fields)
512 )
513 )
514
515 # If this model is deferred, automatically do an "update_fields" save
516 # on the loaded fields.
517 elif not force_insert and deferred_fields:
518 field_names = set()
519 for field in self._meta.concrete_fields:
520 if not field.primary_key and not hasattr(field, "through"):
521 field_names.add(field.attname)
522 loaded_fields = field_names.difference(deferred_fields)
523 if loaded_fields:
524 update_fields = frozenset(loaded_fields)
525
526 if clean_and_validate:
527 self.full_clean(exclude=deferred_fields)
528
529 self.save_base(
530 force_insert=force_insert,
531 force_update=force_update,
532 update_fields=update_fields,
533 )
534
535 def save_base(
536 self,
537 *,
538 raw=False,
539 force_insert=False,
540 force_update=False,
541 update_fields=None,
542 ):
543 """
544 Handle the parts of saving which should be done only once per save,
545 yet need to be done in raw saves, too. This includes some sanity
546 checks and signal sending.
547
548 The 'raw' argument is telling save_base not to save any parent
549 models and not to do any changes to the values before save. This
550 is used by fixture loading.
551 """
552 assert not (force_insert and (force_update or update_fields))
553 assert update_fields is None or update_fields
554 cls = self.__class__
555
556 with transaction.mark_for_rollback_on_error():
557 self._save_table(
558 raw,
559 cls,
560 force_insert,
561 force_update,
562 update_fields,
563 )
564 # Once saved, this is no longer a to-be-added instance.
565 self._state.adding = False
566
567 def _save_table(
568 self,
569 raw=False,
570 cls=None,
571 force_insert=False,
572 force_update=False,
573 update_fields=None,
574 ):
575 """
576 Do the heavy-lifting involved in saving. Update or insert the data
577 for a single table.
578 """
579 meta = cls._meta
580 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
581
582 if update_fields:
583 non_pks = [
584 f
585 for f in non_pks
586 if f.name in update_fields or f.attname in update_fields
587 ]
588
589 id_val = self.id
590 if id_val is None:
591 id_field = meta.get_field("id")
592 id_val = id_field.get_id_value_on_save(self)
593 setattr(self, id_field.attname, id_val)
594 id_set = id_val is not None
595 if not id_set and (force_update or update_fields):
596 raise ValueError("Cannot force an update in save() with no primary key.")
597 updated = False
598 # Skip an UPDATE when adding an instance and primary key has a default.
599 if (
600 not raw
601 and not force_insert
602 and self._state.adding
603 and meta.get_field("id").default
604 and meta.get_field("id").default is not NOT_PROVIDED
605 ):
606 force_insert = True
607 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
608 if id_set and not force_insert:
609 base_qs = meta.base_queryset
610 values = [
611 (
612 f,
613 None,
614 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
615 )
616 for f in non_pks
617 ]
618 forced_update = update_fields or force_update
619 updated = self._do_update(
620 base_qs, id_val, values, update_fields, forced_update
621 )
622 if force_update and not updated:
623 raise DatabaseError("Forced update did not affect any rows.")
624 if update_fields and not updated:
625 raise DatabaseError("Save with update_fields did not affect any rows.")
626 if not updated:
627 fields = meta.local_concrete_fields
628 if not id_set:
629 id_field = meta.get_field("id")
630 fields = [f for f in fields if f is not id_field]
631
632 returning_fields = meta.db_returning_fields
633 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
634 if results:
635 for value, field in zip(results[0], returning_fields):
636 setattr(self, field.attname, value)
637 return updated
638
639 def _do_update(self, base_qs, id_val, values, update_fields, forced_update):
640 """
641 Try to update the model. Return True if the model was updated (if an
642 update query was done and a matching row was found in the DB).
643 """
644 filtered = base_qs.filter(id=id_val)
645 if not values:
646 # We can end up here when saving a model in inheritance chain where
647 # update_fields doesn't target any field in current model. In that
648 # case we just say the update succeeded. Another case ending up here
649 # is a model with just PK - in that case check that the PK still
650 # exists.
651 return update_fields is not None or filtered.exists()
652 return filtered._update(values) > 0
653
654 def _do_insert(self, manager, fields, returning_fields, raw):
655 """
656 Do an INSERT. If returning_fields is defined then this method should
657 return the newly created data for the model.
658 """
659 return manager._insert(
660 [self],
661 fields=fields,
662 returning_fields=returning_fields,
663 raw=raw,
664 )
665
666 def _prepare_related_fields_for_save(self, operation_name, fields=None):
667 # Ensure that a model instance without a PK hasn't been assigned to
668 # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
669 for field in self._meta.concrete_fields:
670 if fields and field not in fields:
671 continue
672 # If the related field isn't cached, then an instance hasn't been
673 # assigned and there's no need to worry about this check.
674 if field.is_relation and field.is_cached(self):
675 obj = getattr(self, field.name, None)
676 if not obj:
677 continue
678 # A pk may have been assigned manually to a model instance not
679 # saved to the database (or auto-generated in a case like
680 # UUIDField), but we allow the save to proceed and rely on the
681 # database to raise an IntegrityError if applicable. If
682 # constraints aren't supported by the database, there's the
683 # unavoidable risk of data corruption.
684 if obj.id is None:
685 # Remove the object from a related instance cache.
686 if not field.remote_field.multiple:
687 field.remote_field.delete_cached_value(obj)
688 raise ValueError(
689 f"{operation_name}() prohibited to prevent data loss due to unsaved "
690 f"related object '{field.name}'."
691 )
692 elif getattr(self, field.attname) in field.empty_values:
693 # Set related object if it has been saved after an
694 # assignment.
695 setattr(self, field.name, obj)
696 # If the relationship's pk/to_field was changed, clear the
697 # cached relationship.
698 if getattr(obj, field.target_field.attname) != getattr(
699 self, field.attname
700 ):
701 field.delete_cached_value(self)
702
703 def delete(self):
704 if self.id is None:
705 raise ValueError(
706 f"{self._meta.object_name} object can't be deleted because its id attribute is set "
707 "to None."
708 )
709 collector = Collector(origin=self)
710 collector.collect([self])
711 return collector.delete()
712
713 def _get_FIELD_display(self, field):
714 value = getattr(self, field.attname)
715 choices_dict = dict(make_hashable(field.flatchoices))
716 # force_str() to coerce lazy strings.
717 return force_str(
718 choices_dict.get(make_hashable(value), value), strings_only=True
719 )
720
721 def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
722 if not self.id:
723 raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
724 op = "gt" if is_next else "lt"
725 order = "" if is_next else "-"
726 param = getattr(self, field.attname)
727 q = Q.create([(field.name, param), (f"id__{op}", self.id)], connector=Q.AND)
728 q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR)
729 qs = (
730 self.__class__.query.filter(**kwargs)
731 .filter(q)
732 .order_by(f"{order}{field.name}", f"{order}id")
733 )
734 try:
735 return qs[0]
736 except IndexError:
737 raise self.DoesNotExist(
738 f"{self.__class__._meta.object_name} matching query does not exist."
739 )
740
741 def _get_field_value_map(self, meta, exclude=None):
742 if exclude is None:
743 exclude = set()
744 meta = meta or self._meta
745 return {
746 field.name: Value(getattr(self, field.attname), field)
747 for field in meta.local_concrete_fields
748 if field.name not in exclude
749 }
750
751 def prepare_database_save(self, field):
752 if self.id is None:
753 raise ValueError(
754 f"Unsaved model instance {self!r} cannot be used in an ORM query."
755 )
756 return getattr(self, field.remote_field.get_related_field().attname)
757
758 def clean(self):
759 """
760 Hook for doing any extra model-wide validation after clean() has been
761 called on every field by self.clean_fields. Any ValidationError raised
762 by this method will not be associated with a particular field; it will
763 have a special-case association with the field defined by NON_FIELD_ERRORS.
764 """
765 pass
766
767 def validate_unique(self, exclude=None):
768 """
769 Check unique constraints on the model and raise ValidationError if any
770 failed.
771 """
772 unique_checks = self._get_unique_checks(exclude=exclude)
773
774 if errors := self._perform_unique_checks(unique_checks):
775 raise ValidationError(errors)
776
777 def _get_unique_checks(self, exclude=None):
778 """
779 Return a list of checks to perform. Since validate_unique() could be
780 called from a ModelForm, some fields may have been excluded; we can't
781 perform a unique check on a model that is missing fields involved
782 in that check. Fields that did not validate should also be excluded,
783 but they need to be passed in via the exclude argument.
784 """
785 if exclude is None:
786 exclude = set()
787 unique_checks = []
788
789 # Gather a list of checks for fields declared as unique and add them to
790 # the list of checks.
791
792 fields_with_class = [(self.__class__, self._meta.local_fields)]
793
794 for model_class, fields in fields_with_class:
795 for f in fields:
796 name = f.name
797 if name in exclude:
798 continue
799 if f.primary_key:
800 unique_checks.append((model_class, (name,)))
801
802 return unique_checks
803
804 def _perform_unique_checks(self, unique_checks):
805 errors = {}
806
807 for model_class, unique_check in unique_checks:
808 # Try to look up an existing object with the same values as this
809 # object's values for all the unique field.
810
811 lookup_kwargs = {}
812 for field_name in unique_check:
813 f = self._meta.get_field(field_name)
814 lookup_value = getattr(self, f.attname)
815 # TODO: Handle multiple backends with different feature flags.
816 if lookup_value is None:
817 # no value, skip the lookup
818 continue
819 if f.primary_key and not self._state.adding:
820 # no need to check for unique primary key when editing
821 continue
822 lookup_kwargs[str(field_name)] = lookup_value
823
824 # some fields were skipped, no reason to do the check
825 if len(unique_check) != len(lookup_kwargs):
826 continue
827
828 qs = model_class.query.filter(**lookup_kwargs)
829
830 # Exclude the current object from the query if we are editing an
831 # instance (as opposed to creating a new one)
832 # Use the primary key defined by model_class. In previous versions
833 # this could differ from `self.id` due to model inheritance.
834 model_class_id = getattr(self, "id")
835 if not self._state.adding and model_class_id is not None:
836 qs = qs.exclude(id=model_class_id)
837 if qs.exists():
838 if len(unique_check) == 1:
839 key = unique_check[0]
840 else:
841 key = NON_FIELD_ERRORS
842 errors.setdefault(key, []).append(
843 self.unique_error_message(model_class, unique_check)
844 )
845
846 return errors
847
848 def unique_error_message(self, model_class, unique_check):
849 opts = model_class._meta
850
851 params = {
852 "model": self,
853 "model_class": model_class,
854 "model_name": opts.model_name,
855 "unique_check": unique_check,
856 }
857
858 if len(unique_check) == 1:
859 field = opts.get_field(unique_check[0])
860 params["field_label"] = field.name
861 return ValidationError(
862 message=field.error_messages["unique"],
863 code="unique",
864 params=params,
865 )
866 else:
867 field_names = [opts.get_field(f).name for f in unique_check]
868
869 # Put an "and" before the last one
870 field_names[-1] = f"and {field_names[-1]}"
871
872 if len(field_names) > 2:
873 # Comma join if more than 2
874 params["field_label"] = ", ".join(field_names)
875 else:
876 # Just a space if there are only 2
877 params["field_label"] = " ".join(field_names)
878
879 # Use the first field as the message format...
880 message = opts.get_field(unique_check[0]).error_messages["unique"]
881
882 return ValidationError(
883 message=message,
884 code="unique",
885 params=params,
886 )
887
888 def get_constraints(self):
889 constraints = [(self.__class__, self._meta.constraints)]
890 return constraints
891
892 def validate_constraints(self, exclude=None):
893 constraints = self.get_constraints()
894
895 errors = {}
896 for model_class, model_constraints in constraints:
897 for constraint in model_constraints:
898 try:
899 constraint.validate(model_class, self, exclude=exclude)
900 except ValidationError as e:
901 if (
902 getattr(e, "code", None) == "unique"
903 and len(constraint.fields) == 1
904 ):
905 errors.setdefault(constraint.fields[0], []).append(e)
906 else:
907 errors = e.update_error_dict(errors)
908 if errors:
909 raise ValidationError(errors)
910
911 def full_clean(
912 self, *, exclude=None, validate_unique=True, validate_constraints=True
913 ):
914 """
915 Call clean_fields(), clean(), validate_unique(), and
916 validate_constraints() on the model. Raise a ValidationError for any
917 errors that occur.
918 """
919 errors = {}
920 if exclude is None:
921 exclude = set()
922 else:
923 exclude = set(exclude)
924
925 try:
926 self.clean_fields(exclude=exclude)
927 except ValidationError as e:
928 errors = e.update_error_dict(errors)
929
930 # Form.clean() is run even if other validation fails, so do the
931 # same with Model.clean() for consistency.
932 try:
933 self.clean()
934 except ValidationError as e:
935 errors = e.update_error_dict(errors)
936
937 # Run unique checks, but only for fields that passed validation.
938 if validate_unique:
939 for name in errors:
940 if name != NON_FIELD_ERRORS and name not in exclude:
941 exclude.add(name)
942 try:
943 self.validate_unique(exclude=exclude)
944 except ValidationError as e:
945 errors = e.update_error_dict(errors)
946
947 # Run constraints checks, but only for fields that passed validation.
948 if validate_constraints:
949 for name in errors:
950 if name != NON_FIELD_ERRORS and name not in exclude:
951 exclude.add(name)
952 try:
953 self.validate_constraints(exclude=exclude)
954 except ValidationError as e:
955 errors = e.update_error_dict(errors)
956
957 if errors:
958 raise ValidationError(errors)
959
960 def clean_fields(self, exclude=None):
961 """
962 Clean all fields and raise a ValidationError containing a dict
963 of all validation errors if any occur.
964 """
965 if exclude is None:
966 exclude = set()
967
968 errors = {}
969 for f in self._meta.fields:
970 if f.name in exclude:
971 continue
972 # Skip validation for empty fields with required=False. The developer
973 # is responsible for making sure they have a valid value.
974 raw_value = getattr(self, f.attname)
975 if not f.required and raw_value in f.empty_values:
976 continue
977 try:
978 setattr(self, f.attname, f.clean(raw_value, self))
979 except ValidationError as e:
980 errors[f.name] = e.error_list
981
982 if errors:
983 raise ValidationError(errors)
984
985 @classmethod
986 def check(cls, **kwargs):
987 errors = []
988
989 database = kwargs.get("database", False)
990 errors += [
991 *cls._check_fields(**kwargs),
992 *cls._check_m2m_through_same_relationship(),
993 *cls._check_long_column_names(database),
994 ]
995 clash_errors = (
996 *cls._check_id_field(),
997 *cls._check_field_name_clashes(),
998 *cls._check_model_name_db_lookup_clashes(),
999 *cls._check_property_name_related_field_accessor_clashes(),
1000 *cls._check_single_primary_key(),
1001 )
1002 errors.extend(clash_errors)
1003 # If there are field name clashes, hide consequent column name
1004 # clashes.
1005 if not clash_errors:
1006 errors.extend(cls._check_column_name_clashes())
1007 errors += [
1008 *cls._check_indexes(database),
1009 *cls._check_ordering(),
1010 *cls._check_constraints(database),
1011 *cls._check_db_table_comment(database),
1012 ]
1013
1014 return errors
1015
1016 @classmethod
1017 def _check_db_table_comment(cls, database):
1018 if not cls._meta.db_table_comment or not database:
1019 return []
1020 errors = []
1021 if not (
1022 db_connection.features.supports_comments
1023 or "supports_comments" in cls._meta.required_db_features
1024 ):
1025 errors.append(
1026 preflight.Warning(
1027 f"{db_connection.display_name} does not support comments on "
1028 f"tables (db_table_comment).",
1029 obj=cls,
1030 id="models.W046",
1031 )
1032 )
1033 return errors
1034
1035 @classmethod
1036 def _check_fields(cls, **kwargs):
1037 """Perform all field checks."""
1038 errors = []
1039 for field in cls._meta.local_fields:
1040 errors.extend(field.check(**kwargs))
1041 for field in cls._meta.local_many_to_many:
1042 errors.extend(field.check(from_model=cls, **kwargs))
1043 return errors
1044
1045 @classmethod
1046 def _check_m2m_through_same_relationship(cls):
1047 """Check if no relationship model is used by more than one m2m field."""
1048
1049 errors = []
1050 seen_intermediary_signatures = []
1051
1052 fields = cls._meta.local_many_to_many
1053
1054 # Skip when the target model wasn't found.
1055 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
1056
1057 # Skip when the relationship model wasn't found.
1058 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
1059
1060 for f in fields:
1061 signature = (
1062 f.remote_field.model,
1063 cls,
1064 f.remote_field.through,
1065 f.remote_field.through_fields,
1066 )
1067 if signature in seen_intermediary_signatures:
1068 errors.append(
1069 preflight.Error(
1070 "The model has two identical many-to-many relations "
1071 f"through the intermediate model '{f.remote_field.through._meta.label}'.",
1072 obj=cls,
1073 id="models.E003",
1074 )
1075 )
1076 else:
1077 seen_intermediary_signatures.append(signature)
1078 return errors
1079
1080 @classmethod
1081 def _check_id_field(cls):
1082 """Disallow user-defined fields named ``id``."""
1083 if any(
1084 f for f in cls._meta.local_fields if f.name == "id" and not f.auto_created
1085 ):
1086 return [
1087 preflight.Error(
1088 "'id' is a reserved word that cannot be used as a field name.",
1089 obj=cls,
1090 id="models.E004",
1091 )
1092 ]
1093 return []
1094
1095 @classmethod
1096 def _check_field_name_clashes(cls):
1097 """Forbid field shadowing in multi-table inheritance."""
1098 errors = []
1099 used_fields = {} # name or attname -> field
1100
1101 for f in cls._meta.local_fields:
1102 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1103 # Note that we may detect clash between user-defined non-unique
1104 # field "id" and automatically added unique field "id", both
1105 # defined at the same model. This special case is considered in
1106 # _check_id_field and here we ignore it.
1107 id_conflict = (
1108 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1109 )
1110 if clash and not id_conflict:
1111 errors.append(
1112 preflight.Error(
1113 f"The field '{f.name}' clashes with the field '{clash.name}' "
1114 f"from model '{clash.model._meta}'.",
1115 obj=f,
1116 id="models.E006",
1117 )
1118 )
1119 used_fields[f.name] = f
1120 used_fields[f.attname] = f
1121
1122 return errors
1123
1124 @classmethod
1125 def _check_column_name_clashes(cls):
1126 # Store a list of column names which have already been used by other fields.
1127 used_column_names = []
1128 errors = []
1129
1130 for f in cls._meta.local_fields:
1131 _, column_name = f.get_attname_column()
1132
1133 # Ensure the column name is not already in use.
1134 if column_name and column_name in used_column_names:
1135 errors.append(
1136 preflight.Error(
1137 f"Field '{f.name}' has column name '{column_name}' that is used by "
1138 "another field.",
1139 hint="Specify a 'db_column' for the field.",
1140 obj=cls,
1141 id="models.E007",
1142 )
1143 )
1144 else:
1145 used_column_names.append(column_name)
1146
1147 return errors
1148
1149 @classmethod
1150 def _check_model_name_db_lookup_clashes(cls):
1151 errors = []
1152 model_name = cls.__name__
1153 if model_name.startswith("_") or model_name.endswith("_"):
1154 errors.append(
1155 preflight.Error(
1156 f"The model name '{model_name}' cannot start or end with an underscore "
1157 "as it collides with the query lookup syntax.",
1158 obj=cls,
1159 id="models.E023",
1160 )
1161 )
1162 elif LOOKUP_SEP in model_name:
1163 errors.append(
1164 preflight.Error(
1165 f"The model name '{model_name}' cannot contain double underscores as "
1166 "it collides with the query lookup syntax.",
1167 obj=cls,
1168 id="models.E024",
1169 )
1170 )
1171 return errors
1172
1173 @classmethod
1174 def _check_property_name_related_field_accessor_clashes(cls):
1175 errors = []
1176 property_names = cls._meta._property_names
1177 related_field_accessors = (
1178 f.get_attname()
1179 for f in cls._meta._get_fields(reverse=False)
1180 if f.is_relation and f.related_model is not None
1181 )
1182 for accessor in related_field_accessors:
1183 if accessor in property_names:
1184 errors.append(
1185 preflight.Error(
1186 f"The property '{accessor}' clashes with a related field "
1187 "accessor.",
1188 obj=cls,
1189 id="models.E025",
1190 )
1191 )
1192 return errors
1193
1194 @classmethod
1195 def _check_single_primary_key(cls):
1196 errors = []
1197 if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1:
1198 errors.append(
1199 preflight.Error(
1200 "The model cannot have more than one field with "
1201 "'primary_key=True'.",
1202 obj=cls,
1203 id="models.E026",
1204 )
1205 )
1206 return errors
1207
1208 @classmethod
1209 def _check_indexes(cls, database):
1210 """Check fields, names, and conditions of indexes."""
1211 errors = []
1212 references = set()
1213 for index in cls._meta.indexes:
1214 # Index name can't start with an underscore or a number, restricted
1215 # for cross-database compatibility with Oracle.
1216 if index.name[0] == "_" or index.name[0].isdigit():
1217 errors.append(
1218 preflight.Error(
1219 f"The index name '{index.name}' cannot start with an underscore "
1220 "or a number.",
1221 obj=cls,
1222 id="models.E033",
1223 ),
1224 )
1225 if len(index.name) > index.max_name_length:
1226 errors.append(
1227 preflight.Error(
1228 "The index name '%s' cannot be longer than %d " # noqa: UP031
1229 "characters." % (index.name, index.max_name_length),
1230 obj=cls,
1231 id="models.E034",
1232 ),
1233 )
1234 if index.contains_expressions:
1235 for expression in index.expressions:
1236 references.update(
1237 ref[0] for ref in cls._get_expr_references(expression)
1238 )
1239 if (
1240 database
1241 and not (
1242 db_connection.features.supports_partial_indexes
1243 or "supports_partial_indexes" in cls._meta.required_db_features
1244 )
1245 and any(index.condition is not None for index in cls._meta.indexes)
1246 ):
1247 errors.append(
1248 preflight.Warning(
1249 f"{db_connection.display_name} does not support indexes with conditions.",
1250 hint=(
1251 "Conditions will be ignored. Silence this warning "
1252 "if you don't care about it."
1253 ),
1254 obj=cls,
1255 id="models.W037",
1256 )
1257 )
1258 if (
1259 database
1260 and not (
1261 db_connection.features.supports_covering_indexes
1262 or "supports_covering_indexes" in cls._meta.required_db_features
1263 )
1264 and any(index.include for index in cls._meta.indexes)
1265 ):
1266 errors.append(
1267 preflight.Warning(
1268 f"{db_connection.display_name} does not support indexes with non-key columns.",
1269 hint=(
1270 "Non-key columns will be ignored. Silence this "
1271 "warning if you don't care about it."
1272 ),
1273 obj=cls,
1274 id="models.W040",
1275 )
1276 )
1277 if (
1278 database
1279 and not (
1280 db_connection.features.supports_expression_indexes
1281 or "supports_expression_indexes" in cls._meta.required_db_features
1282 )
1283 and any(index.contains_expressions for index in cls._meta.indexes)
1284 ):
1285 errors.append(
1286 preflight.Warning(
1287 f"{db_connection.display_name} does not support indexes on expressions.",
1288 hint=(
1289 "An index won't be created. Silence this warning "
1290 "if you don't care about it."
1291 ),
1292 obj=cls,
1293 id="models.W043",
1294 )
1295 )
1296 fields = [
1297 field for index in cls._meta.indexes for field, _ in index.fields_orders
1298 ]
1299 fields += [include for index in cls._meta.indexes for include in index.include]
1300 fields += references
1301 errors.extend(cls._check_local_fields(fields, "indexes"))
1302 return errors
1303
1304 @classmethod
1305 def _check_local_fields(cls, fields, option):
1306 from plain import models
1307
1308 # In order to avoid hitting the relation tree prematurely, we use our
1309 # own fields_map instead of using get_field()
1310 forward_fields_map = {}
1311 for field in cls._meta._get_fields(reverse=False):
1312 forward_fields_map[field.name] = field
1313 if hasattr(field, "attname"):
1314 forward_fields_map[field.attname] = field
1315
1316 errors = []
1317 for field_name in fields:
1318 try:
1319 field = forward_fields_map[field_name]
1320 except KeyError:
1321 errors.append(
1322 preflight.Error(
1323 f"'{option}' refers to the nonexistent field '{field_name}'.",
1324 obj=cls,
1325 id="models.E012",
1326 )
1327 )
1328 else:
1329 if isinstance(field.remote_field, models.ManyToManyRel):
1330 errors.append(
1331 preflight.Error(
1332 f"'{option}' refers to a ManyToManyField '{field_name}', but "
1333 f"ManyToManyFields are not permitted in '{option}'.",
1334 obj=cls,
1335 id="models.E013",
1336 )
1337 )
1338 elif field not in cls._meta.local_fields:
1339 errors.append(
1340 preflight.Error(
1341 f"'{option}' refers to field '{field_name}' which is not local to model "
1342 f"'{cls._meta.object_name}'.",
1343 hint="This issue may be caused by multi-table inheritance.",
1344 obj=cls,
1345 id="models.E016",
1346 )
1347 )
1348 return errors
1349
1350 @classmethod
1351 def _check_ordering(cls):
1352 """
1353 Check "ordering" option -- is it a list of strings and do all fields
1354 exist?
1355 """
1356
1357 if not cls._meta.ordering:
1358 return []
1359
1360 if not isinstance(cls._meta.ordering, list | tuple):
1361 return [
1362 preflight.Error(
1363 "'ordering' must be a tuple or list (even if you want to order by "
1364 "only one field).",
1365 obj=cls,
1366 id="models.E014",
1367 )
1368 ]
1369
1370 errors = []
1371 fields = cls._meta.ordering
1372
1373 # Skip expressions and '?' fields.
1374 fields = (f for f in fields if isinstance(f, str) and f != "?")
1375
1376 # Convert "-field" to "field".
1377 fields = (f.removeprefix("-") for f in fields)
1378
1379 # Separate related fields and non-related fields.
1380 _fields = []
1381 related_fields = []
1382 for f in fields:
1383 if LOOKUP_SEP in f:
1384 related_fields.append(f)
1385 else:
1386 _fields.append(f)
1387 fields = _fields
1388
1389 # Check related fields.
1390 for field in related_fields:
1391 _cls = cls
1392 fld = None
1393 for part in field.split(LOOKUP_SEP):
1394 try:
1395 fld = _cls._meta.get_field(part)
1396 if fld.is_relation:
1397 _cls = fld.path_infos[-1].to_opts.model
1398 else:
1399 _cls = None
1400 except (FieldDoesNotExist, AttributeError):
1401 if fld is None or (
1402 fld.get_transform(part) is None and fld.get_lookup(part) is None
1403 ):
1404 errors.append(
1405 preflight.Error(
1406 "'ordering' refers to the nonexistent field, "
1407 f"related field, or lookup '{field}'.",
1408 obj=cls,
1409 id="models.E015",
1410 )
1411 )
1412
1413 # Check for invalid or nonexistent fields in ordering.
1414 invalid_fields = []
1415
1416 # Any field name that is not present in field_names does not exist.
1417 # Also, ordering by m2m fields is not allowed.
1418 opts = cls._meta
1419 valid_fields = set(
1420 chain.from_iterable(
1421 (f.name, f.attname)
1422 if not (f.auto_created and not f.concrete)
1423 else (f.field.related_query_name(),)
1424 for f in chain(opts.fields, opts.related_objects)
1425 )
1426 )
1427
1428 invalid_fields.extend(set(fields) - valid_fields)
1429
1430 for invalid_field in invalid_fields:
1431 errors.append(
1432 preflight.Error(
1433 "'ordering' refers to the nonexistent field, related "
1434 f"field, or lookup '{invalid_field}'.",
1435 obj=cls,
1436 id="models.E015",
1437 )
1438 )
1439 return errors
1440
1441 @classmethod
1442 def _check_long_column_names(cls, database):
1443 """
1444 Check that any auto-generated column names are shorter than the limits
1445 for each database in which the model will be created.
1446 """
1447 if not database:
1448 return []
1449 errors = []
1450 allowed_len = None
1451
1452 max_name_length = db_connection.ops.max_name_length()
1453 if max_name_length is not None and not db_connection.features.truncates_names:
1454 allowed_len = max_name_length
1455
1456 if allowed_len is None:
1457 return errors
1458
1459 for f in cls._meta.local_fields:
1460 _, column_name = f.get_attname_column()
1461
1462 # Check if auto-generated name for the field is too long
1463 # for the database.
1464 if (
1465 f.db_column is None
1466 and column_name is not None
1467 and len(column_name) > allowed_len
1468 ):
1469 errors.append(
1470 preflight.Error(
1471 f'Autogenerated column name too long for field "{column_name}". '
1472 f'Maximum length is "{allowed_len}" for the database.',
1473 hint="Set the column name manually using 'db_column'.",
1474 obj=cls,
1475 id="models.E018",
1476 )
1477 )
1478
1479 for f in cls._meta.local_many_to_many:
1480 # Skip nonexistent models.
1481 if isinstance(f.remote_field.through, str):
1482 continue
1483
1484 # Check if auto-generated name for the M2M field is too long
1485 # for the database.
1486 for m2m in f.remote_field.through._meta.local_fields:
1487 _, rel_name = m2m.get_attname_column()
1488 if (
1489 m2m.db_column is None
1490 and rel_name is not None
1491 and len(rel_name) > allowed_len
1492 ):
1493 errors.append(
1494 preflight.Error(
1495 "Autogenerated column name too long for M2M field "
1496 f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1497 hint=(
1498 "Use 'through' to create a separate model for "
1499 "M2M and then set column_name using 'db_column'."
1500 ),
1501 obj=cls,
1502 id="models.E019",
1503 )
1504 )
1505
1506 return errors
1507
1508 @classmethod
1509 def _get_expr_references(cls, expr):
1510 if isinstance(expr, Q):
1511 for child in expr.children:
1512 if isinstance(child, tuple):
1513 lookup, value = child
1514 yield tuple(lookup.split(LOOKUP_SEP))
1515 yield from cls._get_expr_references(value)
1516 else:
1517 yield from cls._get_expr_references(child)
1518 elif isinstance(expr, F):
1519 yield tuple(expr.name.split(LOOKUP_SEP))
1520 elif hasattr(expr, "get_source_expressions"):
1521 for src_expr in expr.get_source_expressions():
1522 yield from cls._get_expr_references(src_expr)
1523
1524 @classmethod
1525 def _check_constraints(cls, database):
1526 errors = []
1527 if database:
1528 if not (
1529 db_connection.features.supports_table_check_constraints
1530 or "supports_table_check_constraints" in cls._meta.required_db_features
1531 ) and any(
1532 isinstance(constraint, CheckConstraint)
1533 for constraint in cls._meta.constraints
1534 ):
1535 errors.append(
1536 preflight.Warning(
1537 f"{db_connection.display_name} does not support check constraints.",
1538 hint=(
1539 "A constraint won't be created. Silence this "
1540 "warning if you don't care about it."
1541 ),
1542 obj=cls,
1543 id="models.W027",
1544 )
1545 )
1546 if not (
1547 db_connection.features.supports_partial_indexes
1548 or "supports_partial_indexes" in cls._meta.required_db_features
1549 ) and any(
1550 isinstance(constraint, UniqueConstraint)
1551 and constraint.condition is not None
1552 for constraint in cls._meta.constraints
1553 ):
1554 errors.append(
1555 preflight.Warning(
1556 f"{db_connection.display_name} does not support unique constraints with "
1557 "conditions.",
1558 hint=(
1559 "A constraint won't be created. Silence this "
1560 "warning if you don't care about it."
1561 ),
1562 obj=cls,
1563 id="models.W036",
1564 )
1565 )
1566 if not (
1567 db_connection.features.supports_deferrable_unique_constraints
1568 or "supports_deferrable_unique_constraints"
1569 in cls._meta.required_db_features
1570 ) and any(
1571 isinstance(constraint, UniqueConstraint)
1572 and constraint.deferrable is not None
1573 for constraint in cls._meta.constraints
1574 ):
1575 errors.append(
1576 preflight.Warning(
1577 f"{db_connection.display_name} does not support deferrable unique constraints.",
1578 hint=(
1579 "A constraint won't be created. Silence this "
1580 "warning if you don't care about it."
1581 ),
1582 obj=cls,
1583 id="models.W038",
1584 )
1585 )
1586 if not (
1587 db_connection.features.supports_covering_indexes
1588 or "supports_covering_indexes" in cls._meta.required_db_features
1589 ) and any(
1590 isinstance(constraint, UniqueConstraint) and constraint.include
1591 for constraint in cls._meta.constraints
1592 ):
1593 errors.append(
1594 preflight.Warning(
1595 f"{db_connection.display_name} does not support unique constraints with non-key "
1596 "columns.",
1597 hint=(
1598 "A constraint won't be created. Silence this "
1599 "warning if you don't care about it."
1600 ),
1601 obj=cls,
1602 id="models.W039",
1603 )
1604 )
1605 if not (
1606 db_connection.features.supports_expression_indexes
1607 or "supports_expression_indexes" in cls._meta.required_db_features
1608 ) and any(
1609 isinstance(constraint, UniqueConstraint)
1610 and constraint.contains_expressions
1611 for constraint in cls._meta.constraints
1612 ):
1613 errors.append(
1614 preflight.Warning(
1615 f"{db_connection.display_name} does not support unique constraints on "
1616 "expressions.",
1617 hint=(
1618 "A constraint won't be created. Silence this "
1619 "warning if you don't care about it."
1620 ),
1621 obj=cls,
1622 id="models.W044",
1623 )
1624 )
1625 fields = set(
1626 chain.from_iterable(
1627 (*constraint.fields, *constraint.include)
1628 for constraint in cls._meta.constraints
1629 if isinstance(constraint, UniqueConstraint)
1630 )
1631 )
1632 references = set()
1633 for constraint in cls._meta.constraints:
1634 if isinstance(constraint, UniqueConstraint):
1635 if (
1636 db_connection.features.supports_partial_indexes
1637 or "supports_partial_indexes" not in cls._meta.required_db_features
1638 ) and isinstance(constraint.condition, Q):
1639 references.update(cls._get_expr_references(constraint.condition))
1640 if (
1641 db_connection.features.supports_expression_indexes
1642 or "supports_expression_indexes"
1643 not in cls._meta.required_db_features
1644 ) and constraint.contains_expressions:
1645 for expression in constraint.expressions:
1646 references.update(cls._get_expr_references(expression))
1647 elif isinstance(constraint, CheckConstraint):
1648 if (
1649 db_connection.features.supports_table_check_constraints
1650 or "supports_table_check_constraints"
1651 not in cls._meta.required_db_features
1652 ):
1653 if isinstance(constraint.check, Q):
1654 references.update(cls._get_expr_references(constraint.check))
1655 if any(
1656 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1657 ):
1658 errors.append(
1659 preflight.Warning(
1660 f"Check constraint {constraint.name!r} contains "
1661 f"RawSQL() expression and won't be validated "
1662 f"during the model full_clean().",
1663 hint=(
1664 "Silence this warning if you don't care about it."
1665 ),
1666 obj=cls,
1667 id="models.W045",
1668 ),
1669 )
1670 for field_name, *lookups in references:
1671 fields.add(field_name)
1672 if not lookups:
1673 # If it has no lookups it cannot result in a JOIN.
1674 continue
1675 try:
1676 field = cls._meta.get_field(field_name)
1677 if not field.is_relation or field.many_to_many or field.one_to_many:
1678 continue
1679 except FieldDoesNotExist:
1680 continue
1681 # JOIN must happen at the first lookup.
1682 first_lookup = lookups[0]
1683 if (
1684 hasattr(field, "get_transform")
1685 and hasattr(field, "get_lookup")
1686 and field.get_transform(first_lookup) is None
1687 and field.get_lookup(first_lookup) is None
1688 ):
1689 errors.append(
1690 preflight.Error(
1691 f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1692 obj=cls,
1693 id="models.E041",
1694 )
1695 )
1696 errors.extend(cls._check_local_fields(fields, "constraints"))
1697 return errors
1698
1699
1700########
1701# MISC #
1702########
1703
1704
1705def model_unpickle(model_id):
1706 """Used to unpickle Model subclasses with deferred fields."""
1707 if isinstance(model_id, tuple):
1708 model = models_registry.get_model(*model_id)
1709 else:
1710 # Backwards compat - the model was cached directly in earlier versions.
1711 model = model_id
1712 return model.__new__(model)
1713
1714
1715model_unpickle.__safe_for_unpickle__ = True