1import copy
2import inspect
3import warnings
4from itertools import chain
5
6import plain.runtime
7from plain import preflight
8from plain.exceptions import (
9 NON_FIELD_ERRORS,
10 FieldDoesNotExist,
11 MultipleObjectsReturned,
12 ObjectDoesNotExist,
13 ValidationError,
14)
15from plain.models import models_registry, transaction
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 db_connection,
22)
23from plain.models.deletion import Collector
24from plain.models.expressions import RawSQL, Value
25from plain.models.fields import NOT_PROVIDED
26from plain.models.fields.related import (
27 ForeignObjectRel,
28)
29from plain.models.manager import Manager
30from plain.models.options import Options
31from plain.models.query import F, Q
32from plain.packages import packages_registry
33from plain.utils.encoding import force_str
34from plain.utils.hashable import make_hashable
35
36
37class Deferred:
38 def __repr__(self):
39 return "<Deferred field>"
40
41 def __str__(self):
42 return "<Deferred field>"
43
44
45DEFERRED = Deferred()
46
47
48def _has_contribute_to_class(value):
49 # Only call contribute_to_class() if it's bound.
50 return not inspect.isclass(value) and hasattr(value, "contribute_to_class")
51
52
53class ModelBase(type):
54 """Metaclass for all models."""
55
56 def __new__(cls, name, bases, attrs, **kwargs):
57 # Don't do any of this for the root models.Model class.
58 if not bases:
59 return super().__new__(cls, name, bases, attrs)
60
61 for base in bases:
62 # Models are required to directly inherit from model.Model, not a subclass of it.
63 if issubclass(base, Model) and base is not Model:
64 raise TypeError(
65 f"A model can't extend another model: {name} extends {base}"
66 )
67 # Meta has to be defined on the model itself.
68 if hasattr(base, "Meta"):
69 raise TypeError(
70 "Meta can only be defined on a model itself, not a parent class: "
71 f"{name} extends {base}"
72 )
73
74 new_class = super().__new__(cls, name, bases, attrs, **kwargs)
75
76 new_class._setup_meta()
77 new_class._add_exceptions()
78
79 # Now go back over all the attrs on this class see if they have a contribute_to_class() method.
80 # Attributes with contribute_to_class are fields, meta options, and managers.
81 for attr_name, attr_value in inspect.getmembers(new_class):
82 if attr_name.startswith("_"):
83 continue
84
85 if _has_contribute_to_class(attr_value):
86 if attr_name not in attrs:
87 # If the field came from an inherited class/mixin,
88 # we need to make a copy of it to avoid altering the
89 # original class and other classes that inherit from it.
90 field = copy.deepcopy(attr_value)
91 else:
92 field = attr_value
93 new_class.add_to_class(attr_name, field)
94
95 new_class._meta.concrete_model = new_class
96
97 # Copy indexes so that index names are unique when models extend another class.
98 new_class._meta.indexes = [
99 copy.deepcopy(idx) for idx in new_class._meta.indexes
100 ]
101
102 new_class._prepare()
103
104 return new_class
105
106 def add_to_class(cls, name, value):
107 if _has_contribute_to_class(value):
108 value.contribute_to_class(cls, name)
109 else:
110 setattr(cls, name, value)
111
112 def _setup_meta(cls):
113 name = cls.__name__
114 module = cls.__module__
115
116 # The model's Meta class, if it has one.
117 meta = getattr(cls, "Meta", None)
118
119 # Look for an application configuration to attach the model to.
120 package_config = packages_registry.get_containing_package_config(module)
121
122 package_label = getattr(meta, "package_label", None)
123 if package_label is None:
124 if package_config is None:
125 raise RuntimeError(
126 f"Model class {module}.{name} doesn't declare an explicit "
127 "package_label and isn't in an application in "
128 "INSTALLED_PACKAGES."
129 )
130 else:
131 package_label = package_config.package_label
132
133 cls.add_to_class("_meta", Options(meta, package_label))
134
135 def _add_exceptions(cls):
136 cls.DoesNotExist = type(
137 "DoesNotExist",
138 (ObjectDoesNotExist,),
139 {
140 "__module__": cls.__module__,
141 "__qualname__": f"{cls.__qualname__}.DoesNotExist",
142 },
143 )
144
145 cls.MultipleObjectsReturned = type(
146 "MultipleObjectsReturned",
147 (MultipleObjectsReturned,),
148 {
149 "__module__": cls.__module__,
150 "__qualname__": f"{cls.__qualname__}.MultipleObjectsReturned",
151 },
152 )
153
154 def _prepare(cls):
155 """Create some methods once self._meta has been populated."""
156 opts = cls._meta
157 opts._prepare(cls)
158
159 # Give the class a docstring -- its definition.
160 if cls.__doc__ is None:
161 cls.__doc__ = "{}({})".format(
162 cls.__name__,
163 ", ".join(f.name for f in opts.fields),
164 )
165
166 if not opts.managers:
167 if any(f.name == "objects" for f in opts.fields):
168 raise ValueError(
169 f"Model {cls.__name__} must specify a custom Manager, because it has a "
170 "field named 'objects'."
171 )
172 manager = Manager()
173 manager.auto_created = True
174 cls.add_to_class("objects", manager)
175
176 # Set the name of _meta.indexes. This can't be done in
177 # Options.contribute_to_class() because fields haven't been added to
178 # the model at that point.
179 for index in cls._meta.indexes:
180 if not index.name:
181 index.set_name_with_model(cls)
182
183 @property
184 def _base_manager(cls):
185 return cls._meta.base_manager
186
187 @property
188 def _default_manager(cls):
189 return cls._meta.default_manager
190
191
192class ModelStateFieldsCacheDescriptor:
193 def __get__(self, instance, cls=None):
194 if instance is None:
195 return self
196 res = instance.fields_cache = {}
197 return res
198
199
200class ModelState:
201 """Store model instance state."""
202
203 # If true, uniqueness validation checks will consider this a new, unsaved
204 # object. Necessary for correct validation of new instances of objects with
205 # explicit (non-auto) PKs. This impacts validation only; it has no effect
206 # on the actual save.
207 adding = True
208 fields_cache = ModelStateFieldsCacheDescriptor()
209
210
211class Model(metaclass=ModelBase):
212 def __init__(self, *args, **kwargs):
213 # Alias some things as locals to avoid repeat global lookups
214 cls = self.__class__
215 opts = self._meta
216 _setattr = setattr
217 _DEFERRED = DEFERRED
218
219 # Set up the storage for instance state
220 self._state = ModelState()
221
222 # There is a rather weird disparity here; if kwargs, it's set, then args
223 # overrides it. It should be one or the other; don't duplicate the work
224 # The reason for the kwargs check is that standard iterator passes in by
225 # args, and instantiation for iteration is 33% faster.
226 if len(args) > len(opts.concrete_fields):
227 # Daft, but matches old exception sans the err msg.
228 raise IndexError("Number of args exceeds number of fields")
229
230 if not kwargs:
231 fields_iter = iter(opts.concrete_fields)
232 # The ordering of the zip calls matter - zip throws StopIteration
233 # when an iter throws it. So if the first iter throws it, the second
234 # is *not* consumed. We rely on this, so don't change the order
235 # without changing the logic.
236 for val, field in zip(args, fields_iter):
237 if val is _DEFERRED:
238 continue
239 _setattr(self, field.attname, val)
240 else:
241 # Slower, kwargs-ready version.
242 fields_iter = iter(opts.fields)
243 for val, field in zip(args, fields_iter):
244 if val is _DEFERRED:
245 continue
246 _setattr(self, field.attname, val)
247 if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
248 raise TypeError(
249 f"{cls.__qualname__}() got both positional and "
250 f"keyword arguments for field '{field.name}'."
251 )
252
253 # Now we're left with the unprocessed fields that *must* come from
254 # keywords, or default.
255
256 for field in fields_iter:
257 is_related_object = False
258 # Virtual field
259 if field.attname not in kwargs and field.column is None:
260 continue
261 if kwargs:
262 if isinstance(field.remote_field, ForeignObjectRel):
263 try:
264 # Assume object instance was passed in.
265 rel_obj = kwargs.pop(field.name)
266 is_related_object = True
267 except KeyError:
268 try:
269 # Object instance wasn't passed in -- must be an ID.
270 val = kwargs.pop(field.attname)
271 except KeyError:
272 val = field.get_default()
273 else:
274 try:
275 val = kwargs.pop(field.attname)
276 except KeyError:
277 # This is done with an exception rather than the
278 # default argument on pop because we don't want
279 # get_default() to be evaluated, and then not used.
280 # Refs #12057.
281 val = field.get_default()
282 else:
283 val = field.get_default()
284
285 if is_related_object:
286 # If we are passed a related instance, set it using the
287 # field.name instead of field.attname (e.g. "user" instead of
288 # "user_id") so that the object gets properly cached (and type
289 # checked) by the RelatedObjectDescriptor.
290 if rel_obj is not _DEFERRED:
291 _setattr(self, field.name, rel_obj)
292 else:
293 if val is not _DEFERRED:
294 _setattr(self, field.attname, val)
295
296 if kwargs:
297 property_names = opts._property_names
298 unexpected = ()
299 for prop, value in kwargs.items():
300 # Any remaining kwargs must correspond to properties or virtual
301 # fields.
302 if prop in property_names:
303 if value is not _DEFERRED:
304 _setattr(self, prop, value)
305 else:
306 try:
307 opts.get_field(prop)
308 except FieldDoesNotExist:
309 unexpected += (prop,)
310 else:
311 if value is not _DEFERRED:
312 _setattr(self, prop, value)
313 if unexpected:
314 unexpected_names = ", ".join(repr(n) for n in unexpected)
315 raise TypeError(
316 f"{cls.__name__}() got unexpected keyword arguments: "
317 f"{unexpected_names}"
318 )
319 super().__init__()
320
321 @classmethod
322 def from_db(cls, field_names, values):
323 if len(values) != len(cls._meta.concrete_fields):
324 values_iter = iter(values)
325 values = [
326 next(values_iter) if f.attname in field_names else DEFERRED
327 for f in cls._meta.concrete_fields
328 ]
329 new = cls(*values)
330 new._state.adding = False
331 return new
332
333 def __repr__(self):
334 return f"<{self.__class__.__name__}: {self}>"
335
336 def __str__(self):
337 return f"{self.__class__.__name__} object ({self.pk})"
338
339 def __eq__(self, other):
340 if not isinstance(other, Model):
341 return NotImplemented
342 if self._meta.concrete_model != other._meta.concrete_model:
343 return False
344 my_pk = self.pk
345 if my_pk is None:
346 return self is other
347 return my_pk == other.pk
348
349 def __hash__(self):
350 if self.pk is None:
351 raise TypeError("Model instances without primary key value are unhashable")
352 return hash(self.pk)
353
354 def __reduce__(self):
355 data = self.__getstate__()
356 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
357 class_id = self._meta.package_label, self._meta.object_name
358 return model_unpickle, (class_id,), data
359
360 def __getstate__(self):
361 """Hook to allow choosing the attributes to pickle."""
362 state = self.__dict__.copy()
363 state["_state"] = copy.copy(state["_state"])
364 state["_state"].fields_cache = state["_state"].fields_cache.copy()
365 # memoryview cannot be pickled, so cast it to bytes and store
366 # separately.
367 _memoryview_attrs = []
368 for attr, value in state.items():
369 if isinstance(value, memoryview):
370 _memoryview_attrs.append((attr, bytes(value)))
371 if _memoryview_attrs:
372 state["_memoryview_attrs"] = _memoryview_attrs
373 for attr, value in _memoryview_attrs:
374 state.pop(attr)
375 return state
376
377 def __setstate__(self, state):
378 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
379 if pickled_version:
380 if pickled_version != plain.runtime.__version__:
381 warnings.warn(
382 f"Pickled model instance's Plain version {pickled_version} does not "
383 f"match the current version {plain.runtime.__version__}.",
384 RuntimeWarning,
385 stacklevel=2,
386 )
387 else:
388 warnings.warn(
389 "Pickled model instance's Plain version is not specified.",
390 RuntimeWarning,
391 stacklevel=2,
392 )
393 if "_memoryview_attrs" in state:
394 for attr, value in state.pop("_memoryview_attrs"):
395 state[attr] = memoryview(value)
396 self.__dict__.update(state)
397
398 def _get_pk_val(self, meta=None):
399 meta = meta or self._meta
400 return getattr(self, meta.pk.attname)
401
402 def _set_pk_val(self, value):
403 return setattr(self, self._meta.pk.attname, value)
404
405 pk = property(_get_pk_val, _set_pk_val)
406
407 def get_deferred_fields(self):
408 """
409 Return a set containing names of deferred fields for this instance.
410 """
411 return {
412 f.attname
413 for f in self._meta.concrete_fields
414 if f.attname not in self.__dict__
415 }
416
417 def refresh_from_db(self, fields=None):
418 """
419 Reload field values from the database.
420
421 By default, the reloading happens from the database this instance was
422 loaded from, or by the read router if this instance wasn't loaded from
423 any database. The using parameter will override the default.
424
425 Fields can be used to specify which fields to reload. The fields
426 should be an iterable of field attnames. If fields is None, then
427 all non-deferred fields are reloaded.
428
429 When accessing deferred fields of an instance, the deferred loading
430 of the field will call this method.
431 """
432 if fields is None:
433 self._prefetched_objects_cache = {}
434 else:
435 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
436 for field in fields:
437 if field in prefetched_objects_cache:
438 del prefetched_objects_cache[field]
439 fields.remove(field)
440 if not fields:
441 return
442 if any(LOOKUP_SEP in f for f in fields):
443 raise ValueError(
444 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
445 "are not allowed in fields."
446 )
447
448 db_instance_qs = self.__class__._base_manager.get_queryset().filter(pk=self.pk)
449
450 # Use provided fields, if not set then reload all non-deferred fields.
451 deferred_fields = self.get_deferred_fields()
452 if fields is not None:
453 fields = list(fields)
454 db_instance_qs = db_instance_qs.only(*fields)
455 elif deferred_fields:
456 fields = [
457 f.attname
458 for f in self._meta.concrete_fields
459 if f.attname not in deferred_fields
460 ]
461 db_instance_qs = db_instance_qs.only(*fields)
462
463 db_instance = db_instance_qs.get()
464 non_loaded_fields = db_instance.get_deferred_fields()
465 for field in self._meta.concrete_fields:
466 if field.attname in non_loaded_fields:
467 # This field wasn't refreshed - skip ahead.
468 continue
469 setattr(self, field.attname, getattr(db_instance, field.attname))
470 # Clear cached foreign keys.
471 if field.is_relation and field.is_cached(self):
472 field.delete_cached_value(self)
473
474 # Clear cached relations.
475 for field in self._meta.related_objects:
476 if field.is_cached(self):
477 field.delete_cached_value(self)
478
479 def serializable_value(self, field_name):
480 """
481 Return the value of the field name for this instance. If the field is
482 a foreign key, return the id value instead of the object. If there's
483 no Field object with this name on the model, return the model
484 attribute's value.
485
486 Used to serialize a field's value (in the serializer, or form output,
487 for example). Normally, you would just access the attribute directly
488 and not use this method.
489 """
490 try:
491 field = self._meta.get_field(field_name)
492 except FieldDoesNotExist:
493 return getattr(self, field_name)
494 return getattr(self, field.attname)
495
496 def save(
497 self,
498 *,
499 clean_and_validate=True,
500 force_insert=False,
501 force_update=False,
502 update_fields=None,
503 ):
504 """
505 Save the current instance. Override this in a subclass if you want to
506 control the saving process.
507
508 The 'force_insert' and 'force_update' parameters can be used to insist
509 that the "save" must be an SQL insert or update (or equivalent for
510 non-SQL backends), respectively. Normally, they should not be set.
511 """
512 self._prepare_related_fields_for_save(operation_name="save")
513
514 if force_insert and (force_update or update_fields):
515 raise ValueError("Cannot force both insert and updating in model saving.")
516
517 deferred_fields = self.get_deferred_fields()
518 if update_fields is not None:
519 # If update_fields is empty, skip the save. We do also check for
520 # no-op saves later on for inheritance cases. This bailout is
521 # still needed for skipping signal sending.
522 if not update_fields:
523 return
524
525 update_fields = frozenset(update_fields)
526 field_names = self._meta._non_pk_concrete_field_names
527 non_model_fields = update_fields.difference(field_names)
528
529 if non_model_fields:
530 raise ValueError(
531 "The following fields do not exist in this model, are m2m "
532 "fields, or are non-concrete fields: {}".format(
533 ", ".join(non_model_fields)
534 )
535 )
536
537 # If this model is deferred, automatically do an "update_fields" save
538 # on the loaded fields.
539 elif not force_insert and deferred_fields:
540 field_names = set()
541 for field in self._meta.concrete_fields:
542 if not field.primary_key and not hasattr(field, "through"):
543 field_names.add(field.attname)
544 loaded_fields = field_names.difference(deferred_fields)
545 if loaded_fields:
546 update_fields = frozenset(loaded_fields)
547
548 if clean_and_validate:
549 self.full_clean(exclude=deferred_fields)
550
551 self.save_base(
552 force_insert=force_insert,
553 force_update=force_update,
554 update_fields=update_fields,
555 )
556
557 def save_base(
558 self,
559 *,
560 raw=False,
561 force_insert=False,
562 force_update=False,
563 update_fields=None,
564 ):
565 """
566 Handle the parts of saving which should be done only once per save,
567 yet need to be done in raw saves, too. This includes some sanity
568 checks and signal sending.
569
570 The 'raw' argument is telling save_base not to save any parent
571 models and not to do any changes to the values before save. This
572 is used by fixture loading.
573 """
574 assert not (force_insert and (force_update or update_fields))
575 assert update_fields is None or update_fields
576 cls = self.__class__
577
578 with transaction.mark_for_rollback_on_error():
579 self._save_table(
580 raw,
581 cls,
582 force_insert,
583 force_update,
584 update_fields,
585 )
586 # Once saved, this is no longer a to-be-added instance.
587 self._state.adding = False
588
589 def _save_table(
590 self,
591 raw=False,
592 cls=None,
593 force_insert=False,
594 force_update=False,
595 update_fields=None,
596 ):
597 """
598 Do the heavy-lifting involved in saving. Update or insert the data
599 for a single table.
600 """
601 meta = cls._meta
602 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
603
604 if update_fields:
605 non_pks = [
606 f
607 for f in non_pks
608 if f.name in update_fields or f.attname in update_fields
609 ]
610
611 pk_val = self._get_pk_val(meta)
612 if pk_val is None:
613 pk_val = meta.pk.get_pk_value_on_save(self)
614 setattr(self, meta.pk.attname, pk_val)
615 pk_set = pk_val is not None
616 if not pk_set and (force_update or update_fields):
617 raise ValueError("Cannot force an update in save() with no primary key.")
618 updated = False
619 # Skip an UPDATE when adding an instance and primary key has a default.
620 if (
621 not raw
622 and not force_insert
623 and self._state.adding
624 and meta.pk.default
625 and meta.pk.default is not NOT_PROVIDED
626 ):
627 force_insert = True
628 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
629 if pk_set and not force_insert:
630 base_qs = cls._base_manager
631 values = [
632 (
633 f,
634 None,
635 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
636 )
637 for f in non_pks
638 ]
639 forced_update = update_fields or force_update
640 updated = self._do_update(
641 base_qs, pk_val, values, update_fields, forced_update
642 )
643 if force_update and not updated:
644 raise DatabaseError("Forced update did not affect any rows.")
645 if update_fields and not updated:
646 raise DatabaseError("Save with update_fields did not affect any rows.")
647 if not updated:
648 fields = meta.local_concrete_fields
649 if not pk_set:
650 fields = [f for f in fields if f is not meta.auto_field]
651
652 returning_fields = meta.db_returning_fields
653 results = self._do_insert(cls._base_manager, fields, returning_fields, raw)
654 if results:
655 for value, field in zip(results[0], returning_fields):
656 setattr(self, field.attname, value)
657 return updated
658
659 def _do_update(self, base_qs, pk_val, values, update_fields, forced_update):
660 """
661 Try to update the model. Return True if the model was updated (if an
662 update query was done and a matching row was found in the DB).
663 """
664 filtered = base_qs.filter(pk=pk_val)
665 if not values:
666 # We can end up here when saving a model in inheritance chain where
667 # update_fields doesn't target any field in current model. In that
668 # case we just say the update succeeded. Another case ending up here
669 # is a model with just PK - in that case check that the PK still
670 # exists.
671 return update_fields is not None or filtered.exists()
672 return filtered._update(values) > 0
673
674 def _do_insert(self, manager, fields, returning_fields, raw):
675 """
676 Do an INSERT. If returning_fields is defined then this method should
677 return the newly created data for the model.
678 """
679 return manager._insert(
680 [self],
681 fields=fields,
682 returning_fields=returning_fields,
683 raw=raw,
684 )
685
686 def _prepare_related_fields_for_save(self, operation_name, fields=None):
687 # Ensure that a model instance without a PK hasn't been assigned to
688 # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
689 for field in self._meta.concrete_fields:
690 if fields and field not in fields:
691 continue
692 # If the related field isn't cached, then an instance hasn't been
693 # assigned and there's no need to worry about this check.
694 if field.is_relation and field.is_cached(self):
695 obj = getattr(self, field.name, None)
696 if not obj:
697 continue
698 # A pk may have been assigned manually to a model instance not
699 # saved to the database (or auto-generated in a case like
700 # UUIDField), but we allow the save to proceed and rely on the
701 # database to raise an IntegrityError if applicable. If
702 # constraints aren't supported by the database, there's the
703 # unavoidable risk of data corruption.
704 if obj.pk is None:
705 # Remove the object from a related instance cache.
706 if not field.remote_field.multiple:
707 field.remote_field.delete_cached_value(obj)
708 raise ValueError(
709 f"{operation_name}() prohibited to prevent data loss due to unsaved "
710 f"related object '{field.name}'."
711 )
712 elif getattr(self, field.attname) in field.empty_values:
713 # Set related object if it has been saved after an
714 # assignment.
715 setattr(self, field.name, obj)
716 # If the relationship's pk/to_field was changed, clear the
717 # cached relationship.
718 if getattr(obj, field.target_field.attname) != getattr(
719 self, field.attname
720 ):
721 field.delete_cached_value(self)
722
723 def delete(self):
724 if self.pk is None:
725 raise ValueError(
726 f"{self._meta.object_name} object can't be deleted because its {self._meta.pk.attname} attribute is set "
727 "to None."
728 )
729 collector = Collector(origin=self)
730 collector.collect([self])
731 return collector.delete()
732
733 def _get_FIELD_display(self, field):
734 value = getattr(self, field.attname)
735 choices_dict = dict(make_hashable(field.flatchoices))
736 # force_str() to coerce lazy strings.
737 return force_str(
738 choices_dict.get(make_hashable(value), value), strings_only=True
739 )
740
741 def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
742 if not self.pk:
743 raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
744 op = "gt" if is_next else "lt"
745 order = "" if is_next else "-"
746 param = getattr(self, field.attname)
747 q = Q.create([(field.name, param), (f"pk__{op}", self.pk)], connector=Q.AND)
748 q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR)
749 qs = (
750 self.__class__._default_manager.filter(**kwargs)
751 .filter(q)
752 .order_by(f"{order}{field.name}", f"{order}pk")
753 )
754 try:
755 return qs[0]
756 except IndexError:
757 raise self.DoesNotExist(
758 f"{self.__class__._meta.object_name} matching query does not exist."
759 )
760
761 def _get_field_value_map(self, meta, exclude=None):
762 if exclude is None:
763 exclude = set()
764 meta = meta or self._meta
765 return {
766 field.name: Value(getattr(self, field.attname), field)
767 for field in meta.local_concrete_fields
768 if field.name not in exclude
769 }
770
771 def prepare_database_save(self, field):
772 if self.pk is None:
773 raise ValueError(
774 f"Unsaved model instance {self!r} cannot be used in an ORM query."
775 )
776 return getattr(self, field.remote_field.get_related_field().attname)
777
778 def clean(self):
779 """
780 Hook for doing any extra model-wide validation after clean() has been
781 called on every field by self.clean_fields. Any ValidationError raised
782 by this method will not be associated with a particular field; it will
783 have a special-case association with the field defined by NON_FIELD_ERRORS.
784 """
785 pass
786
787 def validate_unique(self, exclude=None):
788 """
789 Check unique constraints on the model and raise ValidationError if any
790 failed.
791 """
792 unique_checks = self._get_unique_checks(exclude=exclude)
793
794 if errors := self._perform_unique_checks(unique_checks):
795 raise ValidationError(errors)
796
797 def _get_unique_checks(self, exclude=None):
798 """
799 Return a list of checks to perform. Since validate_unique() could be
800 called from a ModelForm, some fields may have been excluded; we can't
801 perform a unique check on a model that is missing fields involved
802 in that check. Fields that did not validate should also be excluded,
803 but they need to be passed in via the exclude argument.
804 """
805 if exclude is None:
806 exclude = set()
807 unique_checks = []
808
809 # Gather a list of checks for fields declared as unique and add them to
810 # the list of checks.
811
812 fields_with_class = [(self.__class__, self._meta.local_fields)]
813
814 for model_class, fields in fields_with_class:
815 for f in fields:
816 name = f.name
817 if name in exclude:
818 continue
819 if f.primary_key:
820 unique_checks.append((model_class, (name,)))
821
822 return unique_checks
823
824 def _perform_unique_checks(self, unique_checks):
825 errors = {}
826
827 for model_class, unique_check in unique_checks:
828 # Try to look up an existing object with the same values as this
829 # object's values for all the unique field.
830
831 lookup_kwargs = {}
832 for field_name in unique_check:
833 f = self._meta.get_field(field_name)
834 lookup_value = getattr(self, f.attname)
835 # TODO: Handle multiple backends with different feature flags.
836 if lookup_value is None:
837 # no value, skip the lookup
838 continue
839 if f.primary_key and not self._state.adding:
840 # no need to check for unique primary key when editing
841 continue
842 lookup_kwargs[str(field_name)] = lookup_value
843
844 # some fields were skipped, no reason to do the check
845 if len(unique_check) != len(lookup_kwargs):
846 continue
847
848 qs = model_class._default_manager.filter(**lookup_kwargs)
849
850 # Exclude the current object from the query if we are editing an
851 # instance (as opposed to creating a new one)
852 # Note that we need to use the pk as defined by model_class, not
853 # self.pk. These can be different fields because model inheritance
854 # allows single model to have effectively multiple primary keys.
855 # Refs #17615.
856 model_class_pk = self._get_pk_val(model_class._meta)
857 if not self._state.adding and model_class_pk is not None:
858 qs = qs.exclude(pk=model_class_pk)
859 if qs.exists():
860 if len(unique_check) == 1:
861 key = unique_check[0]
862 else:
863 key = NON_FIELD_ERRORS
864 errors.setdefault(key, []).append(
865 self.unique_error_message(model_class, unique_check)
866 )
867
868 return errors
869
870 def unique_error_message(self, model_class, unique_check):
871 opts = model_class._meta
872
873 params = {
874 "model": self,
875 "model_class": model_class,
876 "model_name": opts.model_name,
877 "unique_check": unique_check,
878 }
879
880 if len(unique_check) == 1:
881 field = opts.get_field(unique_check[0])
882 params["field_label"] = field.name
883 return ValidationError(
884 message=field.error_messages["unique"],
885 code="unique",
886 params=params,
887 )
888 else:
889 field_names = [opts.get_field(f).name for f in unique_check]
890
891 # Put an "and" before the last one
892 field_names[-1] = f"and {field_names[-1]}"
893
894 if len(field_names) > 2:
895 # Comma join if more than 2
896 params["field_label"] = ", ".join(field_names)
897 else:
898 # Just a space if there are only 2
899 params["field_label"] = " ".join(field_names)
900
901 # Use the first field as the message format...
902 message = opts.get_field(unique_check[0]).error_messages["unique"]
903
904 return ValidationError(
905 message=message,
906 code="unique",
907 params=params,
908 )
909
910 def get_constraints(self):
911 constraints = [(self.__class__, self._meta.constraints)]
912 return constraints
913
914 def validate_constraints(self, exclude=None):
915 constraints = self.get_constraints()
916
917 errors = {}
918 for model_class, model_constraints in constraints:
919 for constraint in model_constraints:
920 try:
921 constraint.validate(model_class, self, exclude=exclude)
922 except ValidationError as e:
923 if (
924 getattr(e, "code", None) == "unique"
925 and len(constraint.fields) == 1
926 ):
927 errors.setdefault(constraint.fields[0], []).append(e)
928 else:
929 errors = e.update_error_dict(errors)
930 if errors:
931 raise ValidationError(errors)
932
933 def full_clean(
934 self, *, exclude=None, validate_unique=True, validate_constraints=True
935 ):
936 """
937 Call clean_fields(), clean(), validate_unique(), and
938 validate_constraints() on the model. Raise a ValidationError for any
939 errors that occur.
940 """
941 errors = {}
942 if exclude is None:
943 exclude = set()
944 else:
945 exclude = set(exclude)
946
947 try:
948 self.clean_fields(exclude=exclude)
949 except ValidationError as e:
950 errors = e.update_error_dict(errors)
951
952 # Form.clean() is run even if other validation fails, so do the
953 # same with Model.clean() for consistency.
954 try:
955 self.clean()
956 except ValidationError as e:
957 errors = e.update_error_dict(errors)
958
959 # Run unique checks, but only for fields that passed validation.
960 if validate_unique:
961 for name in errors:
962 if name != NON_FIELD_ERRORS and name not in exclude:
963 exclude.add(name)
964 try:
965 self.validate_unique(exclude=exclude)
966 except ValidationError as e:
967 errors = e.update_error_dict(errors)
968
969 # Run constraints checks, but only for fields that passed validation.
970 if validate_constraints:
971 for name in errors:
972 if name != NON_FIELD_ERRORS and name not in exclude:
973 exclude.add(name)
974 try:
975 self.validate_constraints(exclude=exclude)
976 except ValidationError as e:
977 errors = e.update_error_dict(errors)
978
979 if errors:
980 raise ValidationError(errors)
981
982 def clean_fields(self, exclude=None):
983 """
984 Clean all fields and raise a ValidationError containing a dict
985 of all validation errors if any occur.
986 """
987 if exclude is None:
988 exclude = set()
989
990 errors = {}
991 for f in self._meta.fields:
992 if f.name in exclude:
993 continue
994 # Skip validation for empty fields with required=False. The developer
995 # is responsible for making sure they have a valid value.
996 raw_value = getattr(self, f.attname)
997 if not f.required and raw_value in f.empty_values:
998 continue
999 try:
1000 setattr(self, f.attname, f.clean(raw_value, self))
1001 except ValidationError as e:
1002 errors[f.name] = e.error_list
1003
1004 if errors:
1005 raise ValidationError(errors)
1006
1007 @classmethod
1008 def check(cls, **kwargs):
1009 errors = [
1010 *cls._check_managers(**kwargs),
1011 ]
1012
1013 database = kwargs.get("database", False)
1014 errors += [
1015 *cls._check_fields(**kwargs),
1016 *cls._check_m2m_through_same_relationship(),
1017 *cls._check_long_column_names(database),
1018 ]
1019 clash_errors = (
1020 *cls._check_id_field(),
1021 *cls._check_field_name_clashes(),
1022 *cls._check_model_name_db_lookup_clashes(),
1023 *cls._check_property_name_related_field_accessor_clashes(),
1024 *cls._check_single_primary_key(),
1025 )
1026 errors.extend(clash_errors)
1027 # If there are field name clashes, hide consequent column name
1028 # clashes.
1029 if not clash_errors:
1030 errors.extend(cls._check_column_name_clashes())
1031 errors += [
1032 *cls._check_indexes(database),
1033 *cls._check_ordering(),
1034 *cls._check_constraints(database),
1035 *cls._check_db_table_comment(database),
1036 ]
1037
1038 return errors
1039
1040 @classmethod
1041 def _check_db_table_comment(cls, database):
1042 if not cls._meta.db_table_comment or not database:
1043 return []
1044 errors = []
1045 if not (
1046 db_connection.features.supports_comments
1047 or "supports_comments" in cls._meta.required_db_features
1048 ):
1049 errors.append(
1050 preflight.Warning(
1051 f"{db_connection.display_name} does not support comments on "
1052 f"tables (db_table_comment).",
1053 obj=cls,
1054 id="models.W046",
1055 )
1056 )
1057 return errors
1058
1059 @classmethod
1060 def _check_managers(cls, **kwargs):
1061 """Perform all manager checks."""
1062 errors = []
1063 for manager in cls._meta.managers:
1064 errors.extend(manager.check(**kwargs))
1065 return errors
1066
1067 @classmethod
1068 def _check_fields(cls, **kwargs):
1069 """Perform all field checks."""
1070 errors = []
1071 for field in cls._meta.local_fields:
1072 errors.extend(field.check(**kwargs))
1073 for field in cls._meta.local_many_to_many:
1074 errors.extend(field.check(from_model=cls, **kwargs))
1075 return errors
1076
1077 @classmethod
1078 def _check_m2m_through_same_relationship(cls):
1079 """Check if no relationship model is used by more than one m2m field."""
1080
1081 errors = []
1082 seen_intermediary_signatures = []
1083
1084 fields = cls._meta.local_many_to_many
1085
1086 # Skip when the target model wasn't found.
1087 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
1088
1089 # Skip when the relationship model wasn't found.
1090 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
1091
1092 for f in fields:
1093 signature = (
1094 f.remote_field.model,
1095 cls,
1096 f.remote_field.through,
1097 f.remote_field.through_fields,
1098 )
1099 if signature in seen_intermediary_signatures:
1100 errors.append(
1101 preflight.Error(
1102 "The model has two identical many-to-many relations "
1103 f"through the intermediate model '{f.remote_field.through._meta.label}'.",
1104 obj=cls,
1105 id="models.E003",
1106 )
1107 )
1108 else:
1109 seen_intermediary_signatures.append(signature)
1110 return errors
1111
1112 @classmethod
1113 def _check_id_field(cls):
1114 """Check if `id` field is a primary key."""
1115 fields = [
1116 f for f in cls._meta.local_fields if f.name == "id" and f != cls._meta.pk
1117 ]
1118 # fields is empty or consists of the invalid "id" field
1119 if fields and not fields[0].primary_key and cls._meta.pk.name == "id":
1120 return [
1121 preflight.Error(
1122 "'id' can only be used as a field name if the field also "
1123 "sets 'primary_key=True'.",
1124 obj=cls,
1125 id="models.E004",
1126 )
1127 ]
1128 else:
1129 return []
1130
1131 @classmethod
1132 def _check_field_name_clashes(cls):
1133 """Forbid field shadowing in multi-table inheritance."""
1134 errors = []
1135 used_fields = {} # name or attname -> field
1136
1137 for f in cls._meta.local_fields:
1138 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1139 # Note that we may detect clash between user-defined non-unique
1140 # field "id" and automatically added unique field "id", both
1141 # defined at the same model. This special case is considered in
1142 # _check_id_field and here we ignore it.
1143 id_conflict = (
1144 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1145 )
1146 if clash and not id_conflict:
1147 errors.append(
1148 preflight.Error(
1149 f"The field '{f.name}' clashes with the field '{clash.name}' "
1150 f"from model '{clash.model._meta}'.",
1151 obj=f,
1152 id="models.E006",
1153 )
1154 )
1155 used_fields[f.name] = f
1156 used_fields[f.attname] = f
1157
1158 return errors
1159
1160 @classmethod
1161 def _check_column_name_clashes(cls):
1162 # Store a list of column names which have already been used by other fields.
1163 used_column_names = []
1164 errors = []
1165
1166 for f in cls._meta.local_fields:
1167 _, column_name = f.get_attname_column()
1168
1169 # Ensure the column name is not already in use.
1170 if column_name and column_name in used_column_names:
1171 errors.append(
1172 preflight.Error(
1173 f"Field '{f.name}' has column name '{column_name}' that is used by "
1174 "another field.",
1175 hint="Specify a 'db_column' for the field.",
1176 obj=cls,
1177 id="models.E007",
1178 )
1179 )
1180 else:
1181 used_column_names.append(column_name)
1182
1183 return errors
1184
1185 @classmethod
1186 def _check_model_name_db_lookup_clashes(cls):
1187 errors = []
1188 model_name = cls.__name__
1189 if model_name.startswith("_") or model_name.endswith("_"):
1190 errors.append(
1191 preflight.Error(
1192 f"The model name '{model_name}' cannot start or end with an underscore "
1193 "as it collides with the query lookup syntax.",
1194 obj=cls,
1195 id="models.E023",
1196 )
1197 )
1198 elif LOOKUP_SEP in model_name:
1199 errors.append(
1200 preflight.Error(
1201 f"The model name '{model_name}' cannot contain double underscores as "
1202 "it collides with the query lookup syntax.",
1203 obj=cls,
1204 id="models.E024",
1205 )
1206 )
1207 return errors
1208
1209 @classmethod
1210 def _check_property_name_related_field_accessor_clashes(cls):
1211 errors = []
1212 property_names = cls._meta._property_names
1213 related_field_accessors = (
1214 f.get_attname()
1215 for f in cls._meta._get_fields(reverse=False)
1216 if f.is_relation and f.related_model is not None
1217 )
1218 for accessor in related_field_accessors:
1219 if accessor in property_names:
1220 errors.append(
1221 preflight.Error(
1222 f"The property '{accessor}' clashes with a related field "
1223 "accessor.",
1224 obj=cls,
1225 id="models.E025",
1226 )
1227 )
1228 return errors
1229
1230 @classmethod
1231 def _check_single_primary_key(cls):
1232 errors = []
1233 if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1:
1234 errors.append(
1235 preflight.Error(
1236 "The model cannot have more than one field with "
1237 "'primary_key=True'.",
1238 obj=cls,
1239 id="models.E026",
1240 )
1241 )
1242 return errors
1243
1244 @classmethod
1245 def _check_indexes(cls, database):
1246 """Check fields, names, and conditions of indexes."""
1247 errors = []
1248 references = set()
1249 for index in cls._meta.indexes:
1250 # Index name can't start with an underscore or a number, restricted
1251 # for cross-database compatibility with Oracle.
1252 if index.name[0] == "_" or index.name[0].isdigit():
1253 errors.append(
1254 preflight.Error(
1255 f"The index name '{index.name}' cannot start with an underscore "
1256 "or a number.",
1257 obj=cls,
1258 id="models.E033",
1259 ),
1260 )
1261 if len(index.name) > index.max_name_length:
1262 errors.append(
1263 preflight.Error(
1264 "The index name '%s' cannot be longer than %d " # noqa: UP031
1265 "characters." % (index.name, index.max_name_length),
1266 obj=cls,
1267 id="models.E034",
1268 ),
1269 )
1270 if index.contains_expressions:
1271 for expression in index.expressions:
1272 references.update(
1273 ref[0] for ref in cls._get_expr_references(expression)
1274 )
1275 if (
1276 database
1277 and not (
1278 db_connection.features.supports_partial_indexes
1279 or "supports_partial_indexes" in cls._meta.required_db_features
1280 )
1281 and any(index.condition is not None for index in cls._meta.indexes)
1282 ):
1283 errors.append(
1284 preflight.Warning(
1285 f"{db_connection.display_name} does not support indexes with conditions.",
1286 hint=(
1287 "Conditions will be ignored. Silence this warning "
1288 "if you don't care about it."
1289 ),
1290 obj=cls,
1291 id="models.W037",
1292 )
1293 )
1294 if (
1295 database
1296 and not (
1297 db_connection.features.supports_covering_indexes
1298 or "supports_covering_indexes" in cls._meta.required_db_features
1299 )
1300 and any(index.include for index in cls._meta.indexes)
1301 ):
1302 errors.append(
1303 preflight.Warning(
1304 f"{db_connection.display_name} does not support indexes with non-key columns.",
1305 hint=(
1306 "Non-key columns will be ignored. Silence this "
1307 "warning if you don't care about it."
1308 ),
1309 obj=cls,
1310 id="models.W040",
1311 )
1312 )
1313 if (
1314 database
1315 and not (
1316 db_connection.features.supports_expression_indexes
1317 or "supports_expression_indexes" in cls._meta.required_db_features
1318 )
1319 and any(index.contains_expressions for index in cls._meta.indexes)
1320 ):
1321 errors.append(
1322 preflight.Warning(
1323 f"{db_connection.display_name} does not support indexes on expressions.",
1324 hint=(
1325 "An index won't be created. Silence this warning "
1326 "if you don't care about it."
1327 ),
1328 obj=cls,
1329 id="models.W043",
1330 )
1331 )
1332 fields = [
1333 field for index in cls._meta.indexes for field, _ in index.fields_orders
1334 ]
1335 fields += [include for index in cls._meta.indexes for include in index.include]
1336 fields += references
1337 errors.extend(cls._check_local_fields(fields, "indexes"))
1338 return errors
1339
1340 @classmethod
1341 def _check_local_fields(cls, fields, option):
1342 from plain import models
1343
1344 # In order to avoid hitting the relation tree prematurely, we use our
1345 # own fields_map instead of using get_field()
1346 forward_fields_map = {}
1347 for field in cls._meta._get_fields(reverse=False):
1348 forward_fields_map[field.name] = field
1349 if hasattr(field, "attname"):
1350 forward_fields_map[field.attname] = field
1351
1352 errors = []
1353 for field_name in fields:
1354 try:
1355 field = forward_fields_map[field_name]
1356 except KeyError:
1357 errors.append(
1358 preflight.Error(
1359 f"'{option}' refers to the nonexistent field '{field_name}'.",
1360 obj=cls,
1361 id="models.E012",
1362 )
1363 )
1364 else:
1365 if isinstance(field.remote_field, models.ManyToManyRel):
1366 errors.append(
1367 preflight.Error(
1368 f"'{option}' refers to a ManyToManyField '{field_name}', but "
1369 f"ManyToManyFields are not permitted in '{option}'.",
1370 obj=cls,
1371 id="models.E013",
1372 )
1373 )
1374 elif field not in cls._meta.local_fields:
1375 errors.append(
1376 preflight.Error(
1377 f"'{option}' refers to field '{field_name}' which is not local to model "
1378 f"'{cls._meta.object_name}'.",
1379 hint="This issue may be caused by multi-table inheritance.",
1380 obj=cls,
1381 id="models.E016",
1382 )
1383 )
1384 return errors
1385
1386 @classmethod
1387 def _check_ordering(cls):
1388 """
1389 Check "ordering" option -- is it a list of strings and do all fields
1390 exist?
1391 """
1392
1393 if not cls._meta.ordering:
1394 return []
1395
1396 if not isinstance(cls._meta.ordering, list | tuple):
1397 return [
1398 preflight.Error(
1399 "'ordering' must be a tuple or list (even if you want to order by "
1400 "only one field).",
1401 obj=cls,
1402 id="models.E014",
1403 )
1404 ]
1405
1406 errors = []
1407 fields = cls._meta.ordering
1408
1409 # Skip expressions and '?' fields.
1410 fields = (f for f in fields if isinstance(f, str) and f != "?")
1411
1412 # Convert "-field" to "field".
1413 fields = (f.removeprefix("-") for f in fields)
1414
1415 # Separate related fields and non-related fields.
1416 _fields = []
1417 related_fields = []
1418 for f in fields:
1419 if LOOKUP_SEP in f:
1420 related_fields.append(f)
1421 else:
1422 _fields.append(f)
1423 fields = _fields
1424
1425 # Check related fields.
1426 for field in related_fields:
1427 _cls = cls
1428 fld = None
1429 for part in field.split(LOOKUP_SEP):
1430 try:
1431 # pk is an alias that won't be found by opts.get_field.
1432 if part == "pk":
1433 fld = _cls._meta.pk
1434 else:
1435 fld = _cls._meta.get_field(part)
1436 if fld.is_relation:
1437 _cls = fld.path_infos[-1].to_opts.model
1438 else:
1439 _cls = None
1440 except (FieldDoesNotExist, AttributeError):
1441 if fld is None or (
1442 fld.get_transform(part) is None and fld.get_lookup(part) is None
1443 ):
1444 errors.append(
1445 preflight.Error(
1446 "'ordering' refers to the nonexistent field, "
1447 f"related field, or lookup '{field}'.",
1448 obj=cls,
1449 id="models.E015",
1450 )
1451 )
1452
1453 # Skip ordering on pk. This is always a valid order_by field
1454 # but is an alias and therefore won't be found by opts.get_field.
1455 fields = {f for f in fields if f != "pk"}
1456
1457 # Check for invalid or nonexistent fields in ordering.
1458 invalid_fields = []
1459
1460 # Any field name that is not present in field_names does not exist.
1461 # Also, ordering by m2m fields is not allowed.
1462 opts = cls._meta
1463 valid_fields = set(
1464 chain.from_iterable(
1465 (f.name, f.attname)
1466 if not (f.auto_created and not f.concrete)
1467 else (f.field.related_query_name(),)
1468 for f in chain(opts.fields, opts.related_objects)
1469 )
1470 )
1471
1472 invalid_fields.extend(fields - valid_fields)
1473
1474 for invalid_field in invalid_fields:
1475 errors.append(
1476 preflight.Error(
1477 "'ordering' refers to the nonexistent field, related "
1478 f"field, or lookup '{invalid_field}'.",
1479 obj=cls,
1480 id="models.E015",
1481 )
1482 )
1483 return errors
1484
1485 @classmethod
1486 def _check_long_column_names(cls, database):
1487 """
1488 Check that any auto-generated column names are shorter than the limits
1489 for each database in which the model will be created.
1490 """
1491 if not database:
1492 return []
1493 errors = []
1494 allowed_len = None
1495
1496 max_name_length = db_connection.ops.max_name_length()
1497 if max_name_length is not None and not db_connection.features.truncates_names:
1498 allowed_len = max_name_length
1499
1500 if allowed_len is None:
1501 return errors
1502
1503 for f in cls._meta.local_fields:
1504 _, column_name = f.get_attname_column()
1505
1506 # Check if auto-generated name for the field is too long
1507 # for the database.
1508 if (
1509 f.db_column is None
1510 and column_name is not None
1511 and len(column_name) > allowed_len
1512 ):
1513 errors.append(
1514 preflight.Error(
1515 f'Autogenerated column name too long for field "{column_name}". '
1516 f'Maximum length is "{allowed_len}" for the database.',
1517 hint="Set the column name manually using 'db_column'.",
1518 obj=cls,
1519 id="models.E018",
1520 )
1521 )
1522
1523 for f in cls._meta.local_many_to_many:
1524 # Skip nonexistent models.
1525 if isinstance(f.remote_field.through, str):
1526 continue
1527
1528 # Check if auto-generated name for the M2M field is too long
1529 # for the database.
1530 for m2m in f.remote_field.through._meta.local_fields:
1531 _, rel_name = m2m.get_attname_column()
1532 if (
1533 m2m.db_column is None
1534 and rel_name is not None
1535 and len(rel_name) > allowed_len
1536 ):
1537 errors.append(
1538 preflight.Error(
1539 "Autogenerated column name too long for M2M field "
1540 f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1541 hint=(
1542 "Use 'through' to create a separate model for "
1543 "M2M and then set column_name using 'db_column'."
1544 ),
1545 obj=cls,
1546 id="models.E019",
1547 )
1548 )
1549
1550 return errors
1551
1552 @classmethod
1553 def _get_expr_references(cls, expr):
1554 if isinstance(expr, Q):
1555 for child in expr.children:
1556 if isinstance(child, tuple):
1557 lookup, value = child
1558 yield tuple(lookup.split(LOOKUP_SEP))
1559 yield from cls._get_expr_references(value)
1560 else:
1561 yield from cls._get_expr_references(child)
1562 elif isinstance(expr, F):
1563 yield tuple(expr.name.split(LOOKUP_SEP))
1564 elif hasattr(expr, "get_source_expressions"):
1565 for src_expr in expr.get_source_expressions():
1566 yield from cls._get_expr_references(src_expr)
1567
1568 @classmethod
1569 def _check_constraints(cls, database):
1570 errors = []
1571 if database:
1572 if not (
1573 db_connection.features.supports_table_check_constraints
1574 or "supports_table_check_constraints" in cls._meta.required_db_features
1575 ) and any(
1576 isinstance(constraint, CheckConstraint)
1577 for constraint in cls._meta.constraints
1578 ):
1579 errors.append(
1580 preflight.Warning(
1581 f"{db_connection.display_name} does not support check constraints.",
1582 hint=(
1583 "A constraint won't be created. Silence this "
1584 "warning if you don't care about it."
1585 ),
1586 obj=cls,
1587 id="models.W027",
1588 )
1589 )
1590 if not (
1591 db_connection.features.supports_partial_indexes
1592 or "supports_partial_indexes" in cls._meta.required_db_features
1593 ) and any(
1594 isinstance(constraint, UniqueConstraint)
1595 and constraint.condition is not None
1596 for constraint in cls._meta.constraints
1597 ):
1598 errors.append(
1599 preflight.Warning(
1600 f"{db_connection.display_name} does not support unique constraints with "
1601 "conditions.",
1602 hint=(
1603 "A constraint won't be created. Silence this "
1604 "warning if you don't care about it."
1605 ),
1606 obj=cls,
1607 id="models.W036",
1608 )
1609 )
1610 if not (
1611 db_connection.features.supports_deferrable_unique_constraints
1612 or "supports_deferrable_unique_constraints"
1613 in cls._meta.required_db_features
1614 ) and any(
1615 isinstance(constraint, UniqueConstraint)
1616 and constraint.deferrable is not None
1617 for constraint in cls._meta.constraints
1618 ):
1619 errors.append(
1620 preflight.Warning(
1621 f"{db_connection.display_name} does not support deferrable unique constraints.",
1622 hint=(
1623 "A constraint won't be created. Silence this "
1624 "warning if you don't care about it."
1625 ),
1626 obj=cls,
1627 id="models.W038",
1628 )
1629 )
1630 if not (
1631 db_connection.features.supports_covering_indexes
1632 or "supports_covering_indexes" in cls._meta.required_db_features
1633 ) and any(
1634 isinstance(constraint, UniqueConstraint) and constraint.include
1635 for constraint in cls._meta.constraints
1636 ):
1637 errors.append(
1638 preflight.Warning(
1639 f"{db_connection.display_name} does not support unique constraints with non-key "
1640 "columns.",
1641 hint=(
1642 "A constraint won't be created. Silence this "
1643 "warning if you don't care about it."
1644 ),
1645 obj=cls,
1646 id="models.W039",
1647 )
1648 )
1649 if not (
1650 db_connection.features.supports_expression_indexes
1651 or "supports_expression_indexes" in cls._meta.required_db_features
1652 ) and any(
1653 isinstance(constraint, UniqueConstraint)
1654 and constraint.contains_expressions
1655 for constraint in cls._meta.constraints
1656 ):
1657 errors.append(
1658 preflight.Warning(
1659 f"{db_connection.display_name} does not support unique constraints on "
1660 "expressions.",
1661 hint=(
1662 "A constraint won't be created. Silence this "
1663 "warning if you don't care about it."
1664 ),
1665 obj=cls,
1666 id="models.W044",
1667 )
1668 )
1669 fields = set(
1670 chain.from_iterable(
1671 (*constraint.fields, *constraint.include)
1672 for constraint in cls._meta.constraints
1673 if isinstance(constraint, UniqueConstraint)
1674 )
1675 )
1676 references = set()
1677 for constraint in cls._meta.constraints:
1678 if isinstance(constraint, UniqueConstraint):
1679 if (
1680 db_connection.features.supports_partial_indexes
1681 or "supports_partial_indexes" not in cls._meta.required_db_features
1682 ) and isinstance(constraint.condition, Q):
1683 references.update(cls._get_expr_references(constraint.condition))
1684 if (
1685 db_connection.features.supports_expression_indexes
1686 or "supports_expression_indexes"
1687 not in cls._meta.required_db_features
1688 ) and constraint.contains_expressions:
1689 for expression in constraint.expressions:
1690 references.update(cls._get_expr_references(expression))
1691 elif isinstance(constraint, CheckConstraint):
1692 if (
1693 db_connection.features.supports_table_check_constraints
1694 or "supports_table_check_constraints"
1695 not in cls._meta.required_db_features
1696 ):
1697 if isinstance(constraint.check, Q):
1698 references.update(cls._get_expr_references(constraint.check))
1699 if any(
1700 isinstance(expr, RawSQL) for expr in constraint.check.flatten()
1701 ):
1702 errors.append(
1703 preflight.Warning(
1704 f"Check constraint {constraint.name!r} contains "
1705 f"RawSQL() expression and won't be validated "
1706 f"during the model full_clean().",
1707 hint=(
1708 "Silence this warning if you don't care about it."
1709 ),
1710 obj=cls,
1711 id="models.W045",
1712 ),
1713 )
1714 for field_name, *lookups in references:
1715 # pk is an alias that won't be found by opts.get_field.
1716 if field_name != "pk":
1717 fields.add(field_name)
1718 if not lookups:
1719 # If it has no lookups it cannot result in a JOIN.
1720 continue
1721 try:
1722 if field_name == "pk":
1723 field = cls._meta.pk
1724 else:
1725 field = cls._meta.get_field(field_name)
1726 if not field.is_relation or field.many_to_many or field.one_to_many:
1727 continue
1728 except FieldDoesNotExist:
1729 continue
1730 # JOIN must happen at the first lookup.
1731 first_lookup = lookups[0]
1732 if (
1733 hasattr(field, "get_transform")
1734 and hasattr(field, "get_lookup")
1735 and field.get_transform(first_lookup) is None
1736 and field.get_lookup(first_lookup) is None
1737 ):
1738 errors.append(
1739 preflight.Error(
1740 f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1741 obj=cls,
1742 id="models.E041",
1743 )
1744 )
1745 errors.extend(cls._check_local_fields(fields, "constraints"))
1746 return errors
1747
1748
1749########
1750# MISC #
1751########
1752
1753
1754def model_unpickle(model_id):
1755 """Used to unpickle Model subclasses with deferred fields."""
1756 if isinstance(model_id, tuple):
1757 model = models_registry.get_model(*model_id)
1758 else:
1759 # Backwards compat - the model was cached directly in earlier versions.
1760 model = model_id
1761 return model.__new__(model)
1762
1763
1764model_unpickle.__safe_for_unpickle__ = True