plain.models
Model your data and store it in a database.
# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField
class User(models.Model):
email = models.EmailField()
password = PasswordField()
is_admin = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.email
Create, update, and delete instances of your models:
from .models import User
# Create a new user
user = User.objects.create(
email="[email protected]",
password="password",
)
# Update a user
user.email = "[email protected]"
user.save()
# Delete a user
user.delete()
# Query for users
admin_users = User.objects.filter(is_admin=True)
Installation
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.models",
]
To connect to a database, you can provide a DATABASE_URL
environment variable.
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Or you can manually define the DATABASES
setting.
# app/settings.py
DATABASES = {
"default": {
"ENGINE": "plain.models.backends.postgresql",
"NAME": "dbname",
"USER": "user",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5432",
}
}
Multiple backends are supported, including Postgres, MySQL, and SQLite.
Querying
Migrations
Fields
Validation
Indexes and constraints
Managers
Forms
1import copy
2import inspect
3import warnings
4from itertools import chain
5
6import plain.runtime
7from plain import preflight
8from plain.exceptions import (
9 NON_FIELD_ERRORS,
10 FieldDoesNotExist,
11 MultipleObjectsReturned,
12 ObjectDoesNotExist,
13 ValidationError,
14)
15from plain.models import models_registry, transaction
16from plain.models.constants import LOOKUP_SEP
17from plain.models.constraints import CheckConstraint, UniqueConstraint
18from plain.models.db import (
19 PLAIN_VERSION_PICKLE_KEY,
20 DatabaseError,
21 connection,
22 connections,
23 router,
24)
25from plain.models.deletion import Collector
26from plain.models.expressions import RawSQL, Value
27from plain.models.fields import NOT_PROVIDED
28from plain.models.fields.related import (
29 ForeignObjectRel,
30)
31from plain.models.manager import Manager
32from plain.models.options import Options
33from plain.models.query import F, Q
34from plain.packages import packages_registry
35from plain.utils.encoding import force_str
36from plain.utils.hashable import make_hashable
37
38
39class Deferred:
40 def __repr__(self):
41 return "<Deferred field>"
42
43 def __str__(self):
44 return "<Deferred field>"
45
46
47DEFERRED = Deferred()
48
49
50def _has_contribute_to_class(value):
51 # Only call contribute_to_class() if it's bound.
52 return not inspect.isclass(value) and hasattr(value, "contribute_to_class")
53
54
55class ModelBase(type):
56 """Metaclass for all models."""
57
58 def __new__(cls, name, bases, attrs, **kwargs):
59 # Don't do any of this for the root models.Model class.
60 if not bases:
61 return super().__new__(cls, name, bases, attrs)
62
63 for base in bases:
64 # Models are required to directly inherit from model.Model, not a subclass of it.
65 if issubclass(base, Model) and base is not Model:
66 raise TypeError(
67 f"A model can't extend another model: {name} extends {base}"
68 )
69 # Meta has to be defined on the model itself.
70 if hasattr(base, "Meta"):
71 raise TypeError(
72 "Meta can only be defined on a model itself, not a parent class: "
73 f"{name} extends {base}"
74 )
75
76 new_class = super().__new__(cls, name, bases, attrs, **kwargs)
77
78 new_class._setup_meta()
79 new_class._add_exceptions()
80
81 # Now go back over all the attrs on this class see if they have a contribute_to_class() method.
82 # Attributes with contribute_to_class are fields, meta options, and managers.
83 for attr_name, attr_value in inspect.getmembers(new_class):
84 if attr_name.startswith("_"):
85 continue
86
87 if _has_contribute_to_class(attr_value):
88 if attr_name not in attrs:
89 # If the field came from an inherited class/mixin,
90 # we need to make a copy of it to avoid altering the
91 # original class and other classes that inherit from it.
92 field = copy.deepcopy(attr_value)
93 else:
94 field = attr_value
95 new_class.add_to_class(attr_name, field)
96
97 new_class._meta.concrete_model = new_class
98
99 # Copy indexes so that index names are unique when models extend another class.
100 new_class._meta.indexes = [
101 copy.deepcopy(idx) for idx in new_class._meta.indexes
102 ]
103
104 new_class._prepare()
105
106 return new_class
107
108 def add_to_class(cls, name, value):
109 if _has_contribute_to_class(value):
110 value.contribute_to_class(cls, name)
111 else:
112 setattr(cls, name, value)
113
114 def _setup_meta(cls):
115 name = cls.__name__
116 module = cls.__module__
117
118 # The model's Meta class, if it has one.
119 meta = getattr(cls, "Meta", None)
120
121 # Look for an application configuration to attach the model to.
122 package_config = packages_registry.get_containing_package_config(module)
123
124 package_label = getattr(meta, "package_label", None)
125 if package_label is None:
126 if package_config is None:
127 raise RuntimeError(
128 f"Model class {module}.{name} doesn't declare an explicit "
129 "package_label and isn't in an application in "
130 "INSTALLED_PACKAGES."
131 )
132 else:
133 package_label = package_config.label
134
135 cls.add_to_class("_meta", Options(meta, package_label))
136
137 def _add_exceptions(cls):
138 cls.DoesNotExist = type(
139 "DoesNotExist",
140 (ObjectDoesNotExist,),
141 {
142 "__module__": cls.__module__,
143 "__qualname__": f"{cls.__qualname__}.DoesNotExist",
144 },
145 )
146
147 cls.MultipleObjectsReturned = type(
148 "MultipleObjectsReturned",
149 (MultipleObjectsReturned,),
150 {
151 "__module__": cls.__module__,
152 "__qualname__": f"{cls.__qualname__}.MultipleObjectsReturned",
153 },
154 )
155
156 def _prepare(cls):
157 """Create some methods once self._meta has been populated."""
158 opts = cls._meta
159 opts._prepare(cls)
160
161 # Give the class a docstring -- its definition.
162 if cls.__doc__ is None:
163 cls.__doc__ = "{}({})".format(
164 cls.__name__,
165 ", ".join(f.name for f in opts.fields),
166 )
167
168 if not opts.managers:
169 if any(f.name == "objects" for f in opts.fields):
170 raise ValueError(
171 f"Model {cls.__name__} must specify a custom Manager, because it has a "
172 "field named 'objects'."
173 )
174 manager = Manager()
175 manager.auto_created = True
176 cls.add_to_class("objects", manager)
177
178 # Set the name of _meta.indexes. This can't be done in
179 # Options.contribute_to_class() because fields haven't been added to
180 # the model at that point.
181 for index in cls._meta.indexes:
182 if not index.name:
183 index.set_name_with_model(cls)
184
185 @property
186 def _base_manager(cls):
187 return cls._meta.base_manager
188
189 @property
190 def _default_manager(cls):
191 return cls._meta.default_manager
192
193
194class ModelStateFieldsCacheDescriptor:
195 def __get__(self, instance, cls=None):
196 if instance is None:
197 return self
198 res = instance.fields_cache = {}
199 return res
200
201
202class ModelState:
203 """Store model instance state."""
204
205 db = None
206 # If true, uniqueness validation checks will consider this a new, unsaved
207 # object. Necessary for correct validation of new instances of objects with
208 # explicit (non-auto) PKs. This impacts validation only; it has no effect
209 # on the actual save.
210 adding = True
211 fields_cache = ModelStateFieldsCacheDescriptor()
212
213
214class Model(metaclass=ModelBase):
215 def __init__(self, *args, **kwargs):
216 # Alias some things as locals to avoid repeat global lookups
217 cls = self.__class__
218 opts = self._meta
219 _setattr = setattr
220 _DEFERRED = DEFERRED
221
222 # Set up the storage for instance state
223 self._state = ModelState()
224
225 # There is a rather weird disparity here; if kwargs, it's set, then args
226 # overrides it. It should be one or the other; don't duplicate the work
227 # The reason for the kwargs check is that standard iterator passes in by
228 # args, and instantiation for iteration is 33% faster.
229 if len(args) > len(opts.concrete_fields):
230 # Daft, but matches old exception sans the err msg.
231 raise IndexError("Number of args exceeds number of fields")
232
233 if not kwargs:
234 fields_iter = iter(opts.concrete_fields)
235 # The ordering of the zip calls matter - zip throws StopIteration
236 # when an iter throws it. So if the first iter throws it, the second
237 # is *not* consumed. We rely on this, so don't change the order
238 # without changing the logic.
239 for val, field in zip(args, fields_iter):
240 if val is _DEFERRED:
241 continue
242 _setattr(self, field.attname, val)
243 else:
244 # Slower, kwargs-ready version.
245 fields_iter = iter(opts.fields)
246 for val, field in zip(args, fields_iter):
247 if val is _DEFERRED:
248 continue
249 _setattr(self, field.attname, val)
250 if kwargs.pop(field.name, NOT_PROVIDED) is not NOT_PROVIDED:
251 raise TypeError(
252 f"{cls.__qualname__}() got both positional and "
253 f"keyword arguments for field '{field.name}'."
254 )
255
256 # Now we're left with the unprocessed fields that *must* come from
257 # keywords, or default.
258
259 for field in fields_iter:
260 is_related_object = False
261 # Virtual field
262 if field.attname not in kwargs and field.column is None:
263 continue
264 if kwargs:
265 if isinstance(field.remote_field, ForeignObjectRel):
266 try:
267 # Assume object instance was passed in.
268 rel_obj = kwargs.pop(field.name)
269 is_related_object = True
270 except KeyError:
271 try:
272 # Object instance wasn't passed in -- must be an ID.
273 val = kwargs.pop(field.attname)
274 except KeyError:
275 val = field.get_default()
276 else:
277 try:
278 val = kwargs.pop(field.attname)
279 except KeyError:
280 # This is done with an exception rather than the
281 # default argument on pop because we don't want
282 # get_default() to be evaluated, and then not used.
283 # Refs #12057.
284 val = field.get_default()
285 else:
286 val = field.get_default()
287
288 if is_related_object:
289 # If we are passed a related instance, set it using the
290 # field.name instead of field.attname (e.g. "user" instead of
291 # "user_id") so that the object gets properly cached (and type
292 # checked) by the RelatedObjectDescriptor.
293 if rel_obj is not _DEFERRED:
294 _setattr(self, field.name, rel_obj)
295 else:
296 if val is not _DEFERRED:
297 _setattr(self, field.attname, val)
298
299 if kwargs:
300 property_names = opts._property_names
301 unexpected = ()
302 for prop, value in kwargs.items():
303 # Any remaining kwargs must correspond to properties or virtual
304 # fields.
305 if prop in property_names:
306 if value is not _DEFERRED:
307 _setattr(self, prop, value)
308 else:
309 try:
310 opts.get_field(prop)
311 except FieldDoesNotExist:
312 unexpected += (prop,)
313 else:
314 if value is not _DEFERRED:
315 _setattr(self, prop, value)
316 if unexpected:
317 unexpected_names = ", ".join(repr(n) for n in unexpected)
318 raise TypeError(
319 f"{cls.__name__}() got unexpected keyword arguments: "
320 f"{unexpected_names}"
321 )
322 super().__init__()
323
324 @classmethod
325 def from_db(cls, db, field_names, values):
326 if len(values) != len(cls._meta.concrete_fields):
327 values_iter = iter(values)
328 values = [
329 next(values_iter) if f.attname in field_names else DEFERRED
330 for f in cls._meta.concrete_fields
331 ]
332 new = cls(*values)
333 new._state.adding = False
334 new._state.db = db
335 return new
336
337 def __repr__(self):
338 return f"<{self.__class__.__name__}: {self}>"
339
340 def __str__(self):
341 return f"{self.__class__.__name__} object ({self.pk})"
342
343 def __eq__(self, other):
344 if not isinstance(other, Model):
345 return NotImplemented
346 if self._meta.concrete_model != other._meta.concrete_model:
347 return False
348 my_pk = self.pk
349 if my_pk is None:
350 return self is other
351 return my_pk == other.pk
352
353 def __hash__(self):
354 if self.pk is None:
355 raise TypeError("Model instances without primary key value are unhashable")
356 return hash(self.pk)
357
358 def __reduce__(self):
359 data = self.__getstate__()
360 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
361 class_id = self._meta.package_label, self._meta.object_name
362 return model_unpickle, (class_id,), data
363
364 def __getstate__(self):
365 """Hook to allow choosing the attributes to pickle."""
366 state = self.__dict__.copy()
367 state["_state"] = copy.copy(state["_state"])
368 state["_state"].fields_cache = state["_state"].fields_cache.copy()
369 # memoryview cannot be pickled, so cast it to bytes and store
370 # separately.
371 _memoryview_attrs = []
372 for attr, value in state.items():
373 if isinstance(value, memoryview):
374 _memoryview_attrs.append((attr, bytes(value)))
375 if _memoryview_attrs:
376 state["_memoryview_attrs"] = _memoryview_attrs
377 for attr, value in _memoryview_attrs:
378 state.pop(attr)
379 return state
380
381 def __setstate__(self, state):
382 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
383 if pickled_version:
384 if pickled_version != plain.runtime.__version__:
385 warnings.warn(
386 f"Pickled model instance's Plain version {pickled_version} does not "
387 f"match the current version {plain.runtime.__version__}.",
388 RuntimeWarning,
389 stacklevel=2,
390 )
391 else:
392 warnings.warn(
393 "Pickled model instance's Plain version is not specified.",
394 RuntimeWarning,
395 stacklevel=2,
396 )
397 if "_memoryview_attrs" in state:
398 for attr, value in state.pop("_memoryview_attrs"):
399 state[attr] = memoryview(value)
400 self.__dict__.update(state)
401
402 def _get_pk_val(self, meta=None):
403 meta = meta or self._meta
404 return getattr(self, meta.pk.attname)
405
406 def _set_pk_val(self, value):
407 return setattr(self, self._meta.pk.attname, value)
408
409 pk = property(_get_pk_val, _set_pk_val)
410
411 def get_deferred_fields(self):
412 """
413 Return a set containing names of deferred fields for this instance.
414 """
415 return {
416 f.attname
417 for f in self._meta.concrete_fields
418 if f.attname not in self.__dict__
419 }
420
421 def refresh_from_db(self, using=None, fields=None):
422 """
423 Reload field values from the database.
424
425 By default, the reloading happens from the database this instance was
426 loaded from, or by the read router if this instance wasn't loaded from
427 any database. The using parameter will override the default.
428
429 Fields can be used to specify which fields to reload. The fields
430 should be an iterable of field attnames. If fields is None, then
431 all non-deferred fields are reloaded.
432
433 When accessing deferred fields of an instance, the deferred loading
434 of the field will call this method.
435 """
436 if fields is None:
437 self._prefetched_objects_cache = {}
438 else:
439 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", ())
440 for field in fields:
441 if field in prefetched_objects_cache:
442 del prefetched_objects_cache[field]
443 fields.remove(field)
444 if not fields:
445 return
446 if any(LOOKUP_SEP in f for f in fields):
447 raise ValueError(
448 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
449 "are not allowed in fields."
450 )
451
452 hints = {"instance": self}
453 db_instance_qs = self.__class__._base_manager.db_manager(
454 using, hints=hints
455 ).filter(pk=self.pk)
456
457 # Use provided fields, if not set then reload all non-deferred fields.
458 deferred_fields = self.get_deferred_fields()
459 if fields is not None:
460 fields = list(fields)
461 db_instance_qs = db_instance_qs.only(*fields)
462 elif deferred_fields:
463 fields = [
464 f.attname
465 for f in self._meta.concrete_fields
466 if f.attname not in deferred_fields
467 ]
468 db_instance_qs = db_instance_qs.only(*fields)
469
470 db_instance = db_instance_qs.get()
471 non_loaded_fields = db_instance.get_deferred_fields()
472 for field in self._meta.concrete_fields:
473 if field.attname in non_loaded_fields:
474 # This field wasn't refreshed - skip ahead.
475 continue
476 setattr(self, field.attname, getattr(db_instance, field.attname))
477 # Clear cached foreign keys.
478 if field.is_relation and field.is_cached(self):
479 field.delete_cached_value(self)
480
481 # Clear cached relations.
482 for field in self._meta.related_objects:
483 if field.is_cached(self):
484 field.delete_cached_value(self)
485
486 self._state.db = db_instance._state.db
487
488 def serializable_value(self, field_name):
489 """
490 Return the value of the field name for this instance. If the field is
491 a foreign key, return the id value instead of the object. If there's
492 no Field object with this name on the model, return the model
493 attribute's value.
494
495 Used to serialize a field's value (in the serializer, or form output,
496 for example). Normally, you would just access the attribute directly
497 and not use this method.
498 """
499 try:
500 field = self._meta.get_field(field_name)
501 except FieldDoesNotExist:
502 return getattr(self, field_name)
503 return getattr(self, field.attname)
504
505 def save(
506 self,
507 *,
508 clean_and_validate=True,
509 force_insert=False,
510 force_update=False,
511 using=None,
512 update_fields=None,
513 ):
514 """
515 Save the current instance. Override this in a subclass if you want to
516 control the saving process.
517
518 The 'force_insert' and 'force_update' parameters can be used to insist
519 that the "save" must be an SQL insert or update (or equivalent for
520 non-SQL backends), respectively. Normally, they should not be set.
521 """
522 self._prepare_related_fields_for_save(operation_name="save")
523
524 using = using or router.db_for_write(self.__class__, instance=self)
525 if force_insert and (force_update or update_fields):
526 raise ValueError("Cannot force both insert and updating in model saving.")
527
528 deferred_fields = self.get_deferred_fields()
529 if update_fields is not None:
530 # If update_fields is empty, skip the save. We do also check for
531 # no-op saves later on for inheritance cases. This bailout is
532 # still needed for skipping signal sending.
533 if not update_fields:
534 return
535
536 update_fields = frozenset(update_fields)
537 field_names = self._meta._non_pk_concrete_field_names
538 non_model_fields = update_fields.difference(field_names)
539
540 if non_model_fields:
541 raise ValueError(
542 "The following fields do not exist in this model, are m2m "
543 "fields, or are non-concrete fields: {}".format(
544 ", ".join(non_model_fields)
545 )
546 )
547
548 # If saving to the same database, and this model is deferred, then
549 # automatically do an "update_fields" save on the loaded fields.
550 elif not force_insert and deferred_fields and using == self._state.db:
551 field_names = set()
552 for field in self._meta.concrete_fields:
553 if not field.primary_key and not hasattr(field, "through"):
554 field_names.add(field.attname)
555 loaded_fields = field_names.difference(deferred_fields)
556 if loaded_fields:
557 update_fields = frozenset(loaded_fields)
558
559 if clean_and_validate:
560 self.full_clean(exclude=deferred_fields)
561
562 self.save_base(
563 using=using,
564 force_insert=force_insert,
565 force_update=force_update,
566 update_fields=update_fields,
567 )
568
569 def save_base(
570 self,
571 *,
572 raw=False,
573 force_insert=False,
574 force_update=False,
575 using=None,
576 update_fields=None,
577 ):
578 """
579 Handle the parts of saving which should be done only once per save,
580 yet need to be done in raw saves, too. This includes some sanity
581 checks and signal sending.
582
583 The 'raw' argument is telling save_base not to save any parent
584 models and not to do any changes to the values before save. This
585 is used by fixture loading.
586 """
587 using = using or router.db_for_write(self.__class__, instance=self)
588 assert not (force_insert and (force_update or update_fields))
589 assert update_fields is None or update_fields
590 cls = self.__class__
591
592 with transaction.mark_for_rollback_on_error(using=using):
593 self._save_table(
594 raw,
595 cls,
596 force_insert,
597 force_update,
598 using,
599 update_fields,
600 )
601 # Store the database on which the object was saved
602 self._state.db = using
603 # Once saved, this is no longer a to-be-added instance.
604 self._state.adding = False
605
606 def _save_table(
607 self,
608 raw=False,
609 cls=None,
610 force_insert=False,
611 force_update=False,
612 using=None,
613 update_fields=None,
614 ):
615 """
616 Do the heavy-lifting involved in saving. Update or insert the data
617 for a single table.
618 """
619 meta = cls._meta
620 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
621
622 if update_fields:
623 non_pks = [
624 f
625 for f in non_pks
626 if f.name in update_fields or f.attname in update_fields
627 ]
628
629 pk_val = self._get_pk_val(meta)
630 if pk_val is None:
631 pk_val = meta.pk.get_pk_value_on_save(self)
632 setattr(self, meta.pk.attname, pk_val)
633 pk_set = pk_val is not None
634 if not pk_set and (force_update or update_fields):
635 raise ValueError("Cannot force an update in save() with no primary key.")
636 updated = False
637 # Skip an UPDATE when adding an instance and primary key has a default.
638 if (
639 not raw
640 and not force_insert
641 and self._state.adding
642 and meta.pk.default
643 and meta.pk.default is not NOT_PROVIDED
644 ):
645 force_insert = True
646 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
647 if pk_set and not force_insert:
648 base_qs = cls._base_manager.using(using)
649 values = [
650 (
651 f,
652 None,
653 (getattr(self, f.attname) if raw else f.pre_save(self, False)),
654 )
655 for f in non_pks
656 ]
657 forced_update = update_fields or force_update
658 updated = self._do_update(
659 base_qs, using, pk_val, values, update_fields, forced_update
660 )
661 if force_update and not updated:
662 raise DatabaseError("Forced update did not affect any rows.")
663 if update_fields and not updated:
664 raise DatabaseError("Save with update_fields did not affect any rows.")
665 if not updated:
666 fields = meta.local_concrete_fields
667 if not pk_set:
668 fields = [f for f in fields if f is not meta.auto_field]
669
670 returning_fields = meta.db_returning_fields
671 results = self._do_insert(
672 cls._base_manager, using, fields, returning_fields, raw
673 )
674 if results:
675 for value, field in zip(results[0], returning_fields):
676 setattr(self, field.attname, value)
677 return updated
678
679 def _do_update(self, base_qs, using, pk_val, values, update_fields, forced_update):
680 """
681 Try to update the model. Return True if the model was updated (if an
682 update query was done and a matching row was found in the DB).
683 """
684 filtered = base_qs.filter(pk=pk_val)
685 if not values:
686 # We can end up here when saving a model in inheritance chain where
687 # update_fields doesn't target any field in current model. In that
688 # case we just say the update succeeded. Another case ending up here
689 # is a model with just PK - in that case check that the PK still
690 # exists.
691 return update_fields is not None or filtered.exists()
692 return filtered._update(values) > 0
693
694 def _do_insert(self, manager, using, fields, returning_fields, raw):
695 """
696 Do an INSERT. If returning_fields is defined then this method should
697 return the newly created data for the model.
698 """
699 return manager._insert(
700 [self],
701 fields=fields,
702 returning_fields=returning_fields,
703 using=using,
704 raw=raw,
705 )
706
707 def _prepare_related_fields_for_save(self, operation_name, fields=None):
708 # Ensure that a model instance without a PK hasn't been assigned to
709 # a ForeignKey on this model. If the field is nullable, allowing the save would result in silent data loss.
710 for field in self._meta.concrete_fields:
711 if fields and field not in fields:
712 continue
713 # If the related field isn't cached, then an instance hasn't been
714 # assigned and there's no need to worry about this check.
715 if field.is_relation and field.is_cached(self):
716 obj = getattr(self, field.name, None)
717 if not obj:
718 continue
719 # A pk may have been assigned manually to a model instance not
720 # saved to the database (or auto-generated in a case like
721 # UUIDField), but we allow the save to proceed and rely on the
722 # database to raise an IntegrityError if applicable. If
723 # constraints aren't supported by the database, there's the
724 # unavoidable risk of data corruption.
725 if obj.pk is None:
726 # Remove the object from a related instance cache.
727 if not field.remote_field.multiple:
728 field.remote_field.delete_cached_value(obj)
729 raise ValueError(
730 f"{operation_name}() prohibited to prevent data loss due to unsaved "
731 f"related object '{field.name}'."
732 )
733 elif getattr(self, field.attname) in field.empty_values:
734 # Set related object if it has been saved after an
735 # assignment.
736 setattr(self, field.name, obj)
737 # If the relationship's pk/to_field was changed, clear the
738 # cached relationship.
739 if getattr(obj, field.target_field.attname) != getattr(
740 self, field.attname
741 ):
742 field.delete_cached_value(self)
743
744 def delete(self, using=None):
745 if self.pk is None:
746 raise ValueError(
747 f"{self._meta.object_name} object can't be deleted because its {self._meta.pk.attname} attribute is set "
748 "to None."
749 )
750 using = using or router.db_for_write(self.__class__, instance=self)
751 collector = Collector(using=using, origin=self)
752 collector.collect([self])
753 return collector.delete()
754
755 def _get_FIELD_display(self, field):
756 value = getattr(self, field.attname)
757 choices_dict = dict(make_hashable(field.flatchoices))
758 # force_str() to coerce lazy strings.
759 return force_str(
760 choices_dict.get(make_hashable(value), value), strings_only=True
761 )
762
763 def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
764 if not self.pk:
765 raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
766 op = "gt" if is_next else "lt"
767 order = "" if is_next else "-"
768 param = getattr(self, field.attname)
769 q = Q.create([(field.name, param), (f"pk__{op}", self.pk)], connector=Q.AND)
770 q = Q.create([q, (f"{field.name}__{op}", param)], connector=Q.OR)
771 qs = (
772 self.__class__._default_manager.using(self._state.db)
773 .filter(**kwargs)
774 .filter(q)
775 .order_by(f"{order}{field.name}", f"{order}pk")
776 )
777 try:
778 return qs[0]
779 except IndexError:
780 raise self.DoesNotExist(
781 f"{self.__class__._meta.object_name} matching query does not exist."
782 )
783
784 def _get_field_value_map(self, meta, exclude=None):
785 if exclude is None:
786 exclude = set()
787 meta = meta or self._meta
788 return {
789 field.name: Value(getattr(self, field.attname), field)
790 for field in meta.local_concrete_fields
791 if field.name not in exclude
792 }
793
794 def prepare_database_save(self, field):
795 if self.pk is None:
796 raise ValueError(
797 f"Unsaved model instance {self!r} cannot be used in an ORM query."
798 )
799 return getattr(self, field.remote_field.get_related_field().attname)
800
801 def clean(self):
802 """
803 Hook for doing any extra model-wide validation after clean() has been
804 called on every field by self.clean_fields. Any ValidationError raised
805 by this method will not be associated with a particular field; it will
806 have a special-case association with the field defined by NON_FIELD_ERRORS.
807 """
808 pass
809
810 def validate_unique(self, exclude=None):
811 """
812 Check unique constraints on the model and raise ValidationError if any
813 failed.
814 """
815 unique_checks = self._get_unique_checks(exclude=exclude)
816
817 if errors := self._perform_unique_checks(unique_checks):
818 raise ValidationError(errors)
819
820 def _get_unique_checks(self, exclude=None):
821 """
822 Return a list of checks to perform. Since validate_unique() could be
823 called from a ModelForm, some fields may have been excluded; we can't
824 perform a unique check on a model that is missing fields involved
825 in that check. Fields that did not validate should also be excluded,
826 but they need to be passed in via the exclude argument.
827 """
828 if exclude is None:
829 exclude = set()
830 unique_checks = []
831
832 # Gather a list of checks for fields declared as unique and add them to
833 # the list of checks.
834
835 fields_with_class = [(self.__class__, self._meta.local_fields)]
836
837 for model_class, fields in fields_with_class:
838 for f in fields:
839 name = f.name
840 if name in exclude:
841 continue
842 if f.primary_key:
843 unique_checks.append((model_class, (name,)))
844
845 return unique_checks
846
847 def _perform_unique_checks(self, unique_checks):
848 errors = {}
849
850 for model_class, unique_check in unique_checks:
851 # Try to look up an existing object with the same values as this
852 # object's values for all the unique field.
853
854 lookup_kwargs = {}
855 for field_name in unique_check:
856 f = self._meta.get_field(field_name)
857 lookup_value = getattr(self, f.attname)
858 # TODO: Handle multiple backends with different feature flags.
859 if lookup_value is None or (
860 lookup_value == ""
861 and connection.features.interprets_empty_strings_as_nulls
862 ):
863 # no value, skip the lookup
864 continue
865 if f.primary_key and not self._state.adding:
866 # no need to check for unique primary key when editing
867 continue
868 lookup_kwargs[str(field_name)] = lookup_value
869
870 # some fields were skipped, no reason to do the check
871 if len(unique_check) != len(lookup_kwargs):
872 continue
873
874 qs = model_class._default_manager.filter(**lookup_kwargs)
875
876 # Exclude the current object from the query if we are editing an
877 # instance (as opposed to creating a new one)
878 # Note that we need to use the pk as defined by model_class, not
879 # self.pk. These can be different fields because model inheritance
880 # allows single model to have effectively multiple primary keys.
881 # Refs #17615.
882 model_class_pk = self._get_pk_val(model_class._meta)
883 if not self._state.adding and model_class_pk is not None:
884 qs = qs.exclude(pk=model_class_pk)
885 if qs.exists():
886 if len(unique_check) == 1:
887 key = unique_check[0]
888 else:
889 key = NON_FIELD_ERRORS
890 errors.setdefault(key, []).append(
891 self.unique_error_message(model_class, unique_check)
892 )
893
894 return errors
895
896 def unique_error_message(self, model_class, unique_check):
897 opts = model_class._meta
898
899 params = {
900 "model": self,
901 "model_class": model_class,
902 "model_name": opts.model_name,
903 "unique_check": unique_check,
904 }
905
906 if len(unique_check) == 1:
907 field = opts.get_field(unique_check[0])
908 params["field_label"] = field.name
909 return ValidationError(
910 message=field.error_messages["unique"],
911 code="unique",
912 params=params,
913 )
914 else:
915 field_names = [opts.get_field(f).name for f in unique_check]
916
917 # Put an "and" before the last one
918 field_names[-1] = f"and {field_names[-1]}"
919
920 if len(field_names) > 2:
921 # Comma join if more than 2
922 params["field_label"] = ", ".join(field_names)
923 else:
924 # Just a space if there are only 2
925 params["field_label"] = " ".join(field_names)
926
927 # Use the first field as the message format...
928 message = opts.get_field(unique_check[0]).error_messages["unique"]
929
930 return ValidationError(
931 message=message,
932 code="unique",
933 params=params,
934 )
935
936 def get_constraints(self):
937 constraints = [(self.__class__, self._meta.constraints)]
938 return constraints
939
940 def validate_constraints(self, exclude=None):
941 constraints = self.get_constraints()
942 using = router.db_for_write(self.__class__, instance=self)
943
944 errors = {}
945 for model_class, model_constraints in constraints:
946 for constraint in model_constraints:
947 try:
948 constraint.validate(model_class, self, exclude=exclude, using=using)
949 except ValidationError as e:
950 if (
951 getattr(e, "code", None) == "unique"
952 and len(constraint.fields) == 1
953 ):
954 errors.setdefault(constraint.fields[0], []).append(e)
955 else:
956 errors = e.update_error_dict(errors)
957 if errors:
958 raise ValidationError(errors)
959
960 def full_clean(
961 self, *, exclude=None, validate_unique=True, validate_constraints=True
962 ):
963 """
964 Call clean_fields(), clean(), validate_unique(), and
965 validate_constraints() on the model. Raise a ValidationError for any
966 errors that occur.
967 """
968 errors = {}
969 if exclude is None:
970 exclude = set()
971 else:
972 exclude = set(exclude)
973
974 try:
975 self.clean_fields(exclude=exclude)
976 except ValidationError as e:
977 errors = e.update_error_dict(errors)
978
979 # Form.clean() is run even if other validation fails, so do the
980 # same with Model.clean() for consistency.
981 try:
982 self.clean()
983 except ValidationError as e:
984 errors = e.update_error_dict(errors)
985
986 # Run unique checks, but only for fields that passed validation.
987 if validate_unique:
988 for name in errors:
989 if name != NON_FIELD_ERRORS and name not in exclude:
990 exclude.add(name)
991 try:
992 self.validate_unique(exclude=exclude)
993 except ValidationError as e:
994 errors = e.update_error_dict(errors)
995
996 # Run constraints checks, but only for fields that passed validation.
997 if validate_constraints:
998 for name in errors:
999 if name != NON_FIELD_ERRORS and name not in exclude:
1000 exclude.add(name)
1001 try:
1002 self.validate_constraints(exclude=exclude)
1003 except ValidationError as e:
1004 errors = e.update_error_dict(errors)
1005
1006 if errors:
1007 raise ValidationError(errors)
1008
1009 def clean_fields(self, exclude=None):
1010 """
1011 Clean all fields and raise a ValidationError containing a dict
1012 of all validation errors if any occur.
1013 """
1014 if exclude is None:
1015 exclude = set()
1016
1017 errors = {}
1018 for f in self._meta.fields:
1019 if f.name in exclude:
1020 continue
1021 # Skip validation for empty fields with required=False. The developer
1022 # is responsible for making sure they have a valid value.
1023 raw_value = getattr(self, f.attname)
1024 if not f.required and raw_value in f.empty_values:
1025 continue
1026 try:
1027 setattr(self, f.attname, f.clean(raw_value, self))
1028 except ValidationError as e:
1029 errors[f.name] = e.error_list
1030
1031 if errors:
1032 raise ValidationError(errors)
1033
1034 @classmethod
1035 def check(cls, **kwargs):
1036 errors = [
1037 *cls._check_managers(**kwargs),
1038 ]
1039
1040 databases = kwargs.get("databases") or []
1041 errors += [
1042 *cls._check_fields(**kwargs),
1043 *cls._check_m2m_through_same_relationship(),
1044 *cls._check_long_column_names(databases),
1045 ]
1046 clash_errors = (
1047 *cls._check_id_field(),
1048 *cls._check_field_name_clashes(),
1049 *cls._check_model_name_db_lookup_clashes(),
1050 *cls._check_property_name_related_field_accessor_clashes(),
1051 *cls._check_single_primary_key(),
1052 )
1053 errors.extend(clash_errors)
1054 # If there are field name clashes, hide consequent column name
1055 # clashes.
1056 if not clash_errors:
1057 errors.extend(cls._check_column_name_clashes())
1058 errors += [
1059 *cls._check_indexes(databases),
1060 *cls._check_ordering(),
1061 *cls._check_constraints(databases),
1062 *cls._check_db_table_comment(databases),
1063 ]
1064
1065 return errors
1066
1067 @classmethod
1068 def _check_db_table_comment(cls, databases):
1069 if not cls._meta.db_table_comment:
1070 return []
1071 errors = []
1072 for db in databases:
1073 if not router.allow_migrate_model(db, cls):
1074 continue
1075 connection = connections[db]
1076 if not (
1077 connection.features.supports_comments
1078 or "supports_comments" in cls._meta.required_db_features
1079 ):
1080 errors.append(
1081 preflight.Warning(
1082 f"{connection.display_name} does not support comments on "
1083 f"tables (db_table_comment).",
1084 obj=cls,
1085 id="models.W046",
1086 )
1087 )
1088 return errors
1089
1090 @classmethod
1091 def _check_managers(cls, **kwargs):
1092 """Perform all manager checks."""
1093 errors = []
1094 for manager in cls._meta.managers:
1095 errors.extend(manager.check(**kwargs))
1096 return errors
1097
1098 @classmethod
1099 def _check_fields(cls, **kwargs):
1100 """Perform all field checks."""
1101 errors = []
1102 for field in cls._meta.local_fields:
1103 errors.extend(field.check(**kwargs))
1104 for field in cls._meta.local_many_to_many:
1105 errors.extend(field.check(from_model=cls, **kwargs))
1106 return errors
1107
1108 @classmethod
1109 def _check_m2m_through_same_relationship(cls):
1110 """Check if no relationship model is used by more than one m2m field."""
1111
1112 errors = []
1113 seen_intermediary_signatures = []
1114
1115 fields = cls._meta.local_many_to_many
1116
1117 # Skip when the target model wasn't found.
1118 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
1119
1120 # Skip when the relationship model wasn't found.
1121 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
1122
1123 for f in fields:
1124 signature = (
1125 f.remote_field.model,
1126 cls,
1127 f.remote_field.through,
1128 f.remote_field.through_fields,
1129 )
1130 if signature in seen_intermediary_signatures:
1131 errors.append(
1132 preflight.Error(
1133 "The model has two identical many-to-many relations "
1134 f"through the intermediate model '{f.remote_field.through._meta.label}'.",
1135 obj=cls,
1136 id="models.E003",
1137 )
1138 )
1139 else:
1140 seen_intermediary_signatures.append(signature)
1141 return errors
1142
1143 @classmethod
1144 def _check_id_field(cls):
1145 """Check if `id` field is a primary key."""
1146 fields = [
1147 f for f in cls._meta.local_fields if f.name == "id" and f != cls._meta.pk
1148 ]
1149 # fields is empty or consists of the invalid "id" field
1150 if fields and not fields[0].primary_key and cls._meta.pk.name == "id":
1151 return [
1152 preflight.Error(
1153 "'id' can only be used as a field name if the field also "
1154 "sets 'primary_key=True'.",
1155 obj=cls,
1156 id="models.E004",
1157 )
1158 ]
1159 else:
1160 return []
1161
1162 @classmethod
1163 def _check_field_name_clashes(cls):
1164 """Forbid field shadowing in multi-table inheritance."""
1165 errors = []
1166 used_fields = {} # name or attname -> field
1167
1168 for f in cls._meta.local_fields:
1169 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1170 # Note that we may detect clash between user-defined non-unique
1171 # field "id" and automatically added unique field "id", both
1172 # defined at the same model. This special case is considered in
1173 # _check_id_field and here we ignore it.
1174 id_conflict = (
1175 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1176 )
1177 if clash and not id_conflict:
1178 errors.append(
1179 preflight.Error(
1180 f"The field '{f.name}' clashes with the field '{clash.name}' "
1181 f"from model '{clash.model._meta}'.",
1182 obj=f,
1183 id="models.E006",
1184 )
1185 )
1186 used_fields[f.name] = f
1187 used_fields[f.attname] = f
1188
1189 return errors
1190
1191 @classmethod
1192 def _check_column_name_clashes(cls):
1193 # Store a list of column names which have already been used by other fields.
1194 used_column_names = []
1195 errors = []
1196
1197 for f in cls._meta.local_fields:
1198 _, column_name = f.get_attname_column()
1199
1200 # Ensure the column name is not already in use.
1201 if column_name and column_name in used_column_names:
1202 errors.append(
1203 preflight.Error(
1204 f"Field '{f.name}' has column name '{column_name}' that is used by "
1205 "another field.",
1206 hint="Specify a 'db_column' for the field.",
1207 obj=cls,
1208 id="models.E007",
1209 )
1210 )
1211 else:
1212 used_column_names.append(column_name)
1213
1214 return errors
1215
1216 @classmethod
1217 def _check_model_name_db_lookup_clashes(cls):
1218 errors = []
1219 model_name = cls.__name__
1220 if model_name.startswith("_") or model_name.endswith("_"):
1221 errors.append(
1222 preflight.Error(
1223 f"The model name '{model_name}' cannot start or end with an underscore "
1224 "as it collides with the query lookup syntax.",
1225 obj=cls,
1226 id="models.E023",
1227 )
1228 )
1229 elif LOOKUP_SEP in model_name:
1230 errors.append(
1231 preflight.Error(
1232 f"The model name '{model_name}' cannot contain double underscores as "
1233 "it collides with the query lookup syntax.",
1234 obj=cls,
1235 id="models.E024",
1236 )
1237 )
1238 return errors
1239
1240 @classmethod
1241 def _check_property_name_related_field_accessor_clashes(cls):
1242 errors = []
1243 property_names = cls._meta._property_names
1244 related_field_accessors = (
1245 f.get_attname()
1246 for f in cls._meta._get_fields(reverse=False)
1247 if f.is_relation and f.related_model is not None
1248 )
1249 for accessor in related_field_accessors:
1250 if accessor in property_names:
1251 errors.append(
1252 preflight.Error(
1253 f"The property '{accessor}' clashes with a related field "
1254 "accessor.",
1255 obj=cls,
1256 id="models.E025",
1257 )
1258 )
1259 return errors
1260
1261 @classmethod
1262 def _check_single_primary_key(cls):
1263 errors = []
1264 if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1:
1265 errors.append(
1266 preflight.Error(
1267 "The model cannot have more than one field with "
1268 "'primary_key=True'.",
1269 obj=cls,
1270 id="models.E026",
1271 )
1272 )
1273 return errors
1274
1275 @classmethod
1276 def _check_indexes(cls, databases):
1277 """Check fields, names, and conditions of indexes."""
1278 errors = []
1279 references = set()
1280 for index in cls._meta.indexes:
1281 # Index name can't start with an underscore or a number, restricted
1282 # for cross-database compatibility with Oracle.
1283 if index.name[0] == "_" or index.name[0].isdigit():
1284 errors.append(
1285 preflight.Error(
1286 f"The index name '{index.name}' cannot start with an underscore "
1287 "or a number.",
1288 obj=cls,
1289 id="models.E033",
1290 ),
1291 )
1292 if len(index.name) > index.max_name_length:
1293 errors.append(
1294 preflight.Error(
1295 "The index name '%s' cannot be longer than %d " # noqa: UP031
1296 "characters." % (index.name, index.max_name_length),
1297 obj=cls,
1298 id="models.E034",
1299 ),
1300 )
1301 if index.contains_expressions:
1302 for expression in index.expressions:
1303 references.update(
1304 ref[0] for ref in cls._get_expr_references(expression)
1305 )
1306 for db in databases:
1307 if not router.allow_migrate_model(db, cls):
1308 continue
1309 connection = connections[db]
1310 if not (
1311 connection.features.supports_partial_indexes
1312 or "supports_partial_indexes" in cls._meta.required_db_features
1313 ) and any(index.condition is not None for index in cls._meta.indexes):
1314 errors.append(
1315 preflight.Warning(
1316 f"{connection.display_name} does not support indexes with conditions.",
1317 hint=(
1318 "Conditions will be ignored. Silence this warning "
1319 "if you don't care about it."
1320 ),
1321 obj=cls,
1322 id="models.W037",
1323 )
1324 )
1325 if not (
1326 connection.features.supports_covering_indexes
1327 or "supports_covering_indexes" in cls._meta.required_db_features
1328 ) and any(index.include for index in cls._meta.indexes):
1329 errors.append(
1330 preflight.Warning(
1331 f"{connection.display_name} does not support indexes with non-key columns.",
1332 hint=(
1333 "Non-key columns will be ignored. Silence this "
1334 "warning if you don't care about it."
1335 ),
1336 obj=cls,
1337 id="models.W040",
1338 )
1339 )
1340 if not (
1341 connection.features.supports_expression_indexes
1342 or "supports_expression_indexes" in cls._meta.required_db_features
1343 ) and any(index.contains_expressions for index in cls._meta.indexes):
1344 errors.append(
1345 preflight.Warning(
1346 f"{connection.display_name} does not support indexes on expressions.",
1347 hint=(
1348 "An index won't be created. Silence this warning "
1349 "if you don't care about it."
1350 ),
1351 obj=cls,
1352 id="models.W043",
1353 )
1354 )
1355 fields = [
1356 field for index in cls._meta.indexes for field, _ in index.fields_orders
1357 ]
1358 fields += [include for index in cls._meta.indexes for include in index.include]
1359 fields += references
1360 errors.extend(cls._check_local_fields(fields, "indexes"))
1361 return errors
1362
1363 @classmethod
1364 def _check_local_fields(cls, fields, option):
1365 from plain import models
1366
1367 # In order to avoid hitting the relation tree prematurely, we use our
1368 # own fields_map instead of using get_field()
1369 forward_fields_map = {}
1370 for field in cls._meta._get_fields(reverse=False):
1371 forward_fields_map[field.name] = field
1372 if hasattr(field, "attname"):
1373 forward_fields_map[field.attname] = field
1374
1375 errors = []
1376 for field_name in fields:
1377 try:
1378 field = forward_fields_map[field_name]
1379 except KeyError:
1380 errors.append(
1381 preflight.Error(
1382 f"'{option}' refers to the nonexistent field '{field_name}'.",
1383 obj=cls,
1384 id="models.E012",
1385 )
1386 )
1387 else:
1388 if isinstance(field.remote_field, models.ManyToManyRel):
1389 errors.append(
1390 preflight.Error(
1391 f"'{option}' refers to a ManyToManyField '{field_name}', but "
1392 f"ManyToManyFields are not permitted in '{option}'.",
1393 obj=cls,
1394 id="models.E013",
1395 )
1396 )
1397 elif field not in cls._meta.local_fields:
1398 errors.append(
1399 preflight.Error(
1400 f"'{option}' refers to field '{field_name}' which is not local to model "
1401 f"'{cls._meta.object_name}'.",
1402 hint="This issue may be caused by multi-table inheritance.",
1403 obj=cls,
1404 id="models.E016",
1405 )
1406 )
1407 return errors
1408
1409 @classmethod
1410 def _check_ordering(cls):
1411 """
1412 Check "ordering" option -- is it a list of strings and do all fields
1413 exist?
1414 """
1415
1416 if not cls._meta.ordering:
1417 return []
1418
1419 if not isinstance(cls._meta.ordering, list | tuple):
1420 return [
1421 preflight.Error(
1422 "'ordering' must be a tuple or list (even if you want to order by "
1423 "only one field).",
1424 obj=cls,
1425 id="models.E014",
1426 )
1427 ]
1428
1429 errors = []
1430 fields = cls._meta.ordering
1431
1432 # Skip expressions and '?' fields.
1433 fields = (f for f in fields if isinstance(f, str) and f != "?")
1434
1435 # Convert "-field" to "field".
1436 fields = (f.removeprefix("-") for f in fields)
1437
1438 # Separate related fields and non-related fields.
1439 _fields = []
1440 related_fields = []
1441 for f in fields:
1442 if LOOKUP_SEP in f:
1443 related_fields.append(f)
1444 else:
1445 _fields.append(f)
1446 fields = _fields
1447
1448 # Check related fields.
1449 for field in related_fields:
1450 _cls = cls
1451 fld = None
1452 for part in field.split(LOOKUP_SEP):
1453 try:
1454 # pk is an alias that won't be found by opts.get_field.
1455 if part == "pk":
1456 fld = _cls._meta.pk
1457 else:
1458 fld = _cls._meta.get_field(part)
1459 if fld.is_relation:
1460 _cls = fld.path_infos[-1].to_opts.model
1461 else:
1462 _cls = None
1463 except (FieldDoesNotExist, AttributeError):
1464 if fld is None or (
1465 fld.get_transform(part) is None and fld.get_lookup(part) is None
1466 ):
1467 errors.append(
1468 preflight.Error(
1469 "'ordering' refers to the nonexistent field, "
1470 f"related field, or lookup '{field}'.",
1471 obj=cls,
1472 id="models.E015",
1473 )
1474 )
1475
1476 # Skip ordering on pk. This is always a valid order_by field
1477 # but is an alias and therefore won't be found by opts.get_field.
1478 fields = {f for f in fields if f != "pk"}
1479
1480 # Check for invalid or nonexistent fields in ordering.
1481 invalid_fields = []
1482
1483 # Any field name that is not present in field_names does not exist.
1484 # Also, ordering by m2m fields is not allowed.
1485 opts = cls._meta
1486 valid_fields = set(
1487 chain.from_iterable(
1488 (f.name, f.attname)
1489 if not (f.auto_created and not f.concrete)
1490 else (f.field.related_query_name(),)
1491 for f in chain(opts.fields, opts.related_objects)
1492 )
1493 )
1494
1495 invalid_fields.extend(fields - valid_fields)
1496
1497 for invalid_field in invalid_fields:
1498 errors.append(
1499 preflight.Error(
1500 "'ordering' refers to the nonexistent field, related "
1501 f"field, or lookup '{invalid_field}'.",
1502 obj=cls,
1503 id="models.E015",
1504 )
1505 )
1506 return errors
1507
1508 @classmethod
1509 def _check_long_column_names(cls, databases):
1510 """
1511 Check that any auto-generated column names are shorter than the limits
1512 for each database in which the model will be created.
1513 """
1514 if not databases:
1515 return []
1516 errors = []
1517 allowed_len = None
1518 db_alias = None
1519
1520 # Find the minimum max allowed length among all specified db_aliases.
1521 for db in databases:
1522 # skip databases where the model won't be created
1523 if not router.allow_migrate_model(db, cls):
1524 continue
1525 connection = connections[db]
1526 max_name_length = connection.ops.max_name_length()
1527 if max_name_length is None or connection.features.truncates_names:
1528 continue
1529 else:
1530 if allowed_len is None:
1531 allowed_len = max_name_length
1532 db_alias = db
1533 elif max_name_length < allowed_len:
1534 allowed_len = max_name_length
1535 db_alias = db
1536
1537 if allowed_len is None:
1538 return errors
1539
1540 for f in cls._meta.local_fields:
1541 _, column_name = f.get_attname_column()
1542
1543 # Check if auto-generated name for the field is too long
1544 # for the database.
1545 if (
1546 f.db_column is None
1547 and column_name is not None
1548 and len(column_name) > allowed_len
1549 ):
1550 errors.append(
1551 preflight.Error(
1552 f'Autogenerated column name too long for field "{column_name}". '
1553 f'Maximum length is "{allowed_len}" for database "{db_alias}".',
1554 hint="Set the column name manually using 'db_column'.",
1555 obj=cls,
1556 id="models.E018",
1557 )
1558 )
1559
1560 for f in cls._meta.local_many_to_many:
1561 # Skip nonexistent models.
1562 if isinstance(f.remote_field.through, str):
1563 continue
1564
1565 # Check if auto-generated name for the M2M field is too long
1566 # for the database.
1567 for m2m in f.remote_field.through._meta.local_fields:
1568 _, rel_name = m2m.get_attname_column()
1569 if (
1570 m2m.db_column is None
1571 and rel_name is not None
1572 and len(rel_name) > allowed_len
1573 ):
1574 errors.append(
1575 preflight.Error(
1576 "Autogenerated column name too long for M2M field "
1577 f'"{rel_name}". Maximum length is "{allowed_len}" for database "{db_alias}".',
1578 hint=(
1579 "Use 'through' to create a separate model for "
1580 "M2M and then set column_name using 'db_column'."
1581 ),
1582 obj=cls,
1583 id="models.E019",
1584 )
1585 )
1586
1587 return errors
1588
1589 @classmethod
1590 def _get_expr_references(cls, expr):
1591 if isinstance(expr, Q):
1592 for child in expr.children:
1593 if isinstance(child, tuple):
1594 lookup, value = child
1595 yield tuple(lookup.split(LOOKUP_SEP))
1596 yield from cls._get_expr_references(value)
1597 else:
1598 yield from cls._get_expr_references(child)
1599 elif isinstance(expr, F):
1600 yield tuple(expr.name.split(LOOKUP_SEP))
1601 elif hasattr(expr, "get_source_expressions"):
1602 for src_expr in expr.get_source_expressions():
1603 yield from cls._get_expr_references(src_expr)
1604
1605 @classmethod
1606 def _check_constraints(cls, databases):
1607 errors = []
1608 for db in databases:
1609 if not router.allow_migrate_model(db, cls):
1610 continue
1611 connection = connections[db]
1612 if not (
1613 connection.features.supports_table_check_constraints
1614 or "supports_table_check_constraints" in cls._meta.required_db_features
1615 ) and any(
1616 isinstance(constraint, CheckConstraint)
1617 for constraint in cls._meta.constraints
1618 ):
1619 errors.append(
1620 preflight.Warning(
1621 f"{connection.display_name} does not support check constraints.",
1622 hint=(
1623 "A constraint won't be created. Silence this "
1624 "warning if you don't care about it."
1625 ),
1626 obj=cls,
1627 id="models.W027",
1628 )
1629 )
1630 if not (
1631 connection.features.supports_partial_indexes
1632 or "supports_partial_indexes" in cls._meta.required_db_features
1633 ) and any(
1634 isinstance(constraint, UniqueConstraint)
1635 and constraint.condition is not None
1636 for constraint in cls._meta.constraints
1637 ):
1638 errors.append(
1639 preflight.Warning(
1640 f"{connection.display_name} does not support unique constraints with "
1641 "conditions.",
1642 hint=(
1643 "A constraint won't be created. Silence this "
1644 "warning if you don't care about it."
1645 ),
1646 obj=cls,
1647 id="models.W036",
1648 )
1649 )
1650 if not (
1651 connection.features.supports_deferrable_unique_constraints
1652 or "supports_deferrable_unique_constraints"
1653 in cls._meta.required_db_features
1654 ) and any(
1655 isinstance(constraint, UniqueConstraint)
1656 and constraint.deferrable is not None
1657 for constraint in cls._meta.constraints
1658 ):
1659 errors.append(
1660 preflight.Warning(
1661 f"{connection.display_name} does not support deferrable unique constraints.",
1662 hint=(
1663 "A constraint won't be created. Silence this "
1664 "warning if you don't care about it."
1665 ),
1666 obj=cls,
1667 id="models.W038",
1668 )
1669 )
1670 if not (
1671 connection.features.supports_covering_indexes
1672 or "supports_covering_indexes" in cls._meta.required_db_features
1673 ) and any(
1674 isinstance(constraint, UniqueConstraint) and constraint.include
1675 for constraint in cls._meta.constraints
1676 ):
1677 errors.append(
1678 preflight.Warning(
1679 f"{connection.display_name} does not support unique constraints with non-key "
1680 "columns.",
1681 hint=(
1682 "A constraint won't be created. Silence this "
1683 "warning if you don't care about it."
1684 ),
1685 obj=cls,
1686 id="models.W039",
1687 )
1688 )
1689 if not (
1690 connection.features.supports_expression_indexes
1691 or "supports_expression_indexes" in cls._meta.required_db_features
1692 ) and any(
1693 isinstance(constraint, UniqueConstraint)
1694 and constraint.contains_expressions
1695 for constraint in cls._meta.constraints
1696 ):
1697 errors.append(
1698 preflight.Warning(
1699 f"{connection.display_name} does not support unique constraints on "
1700 "expressions.",
1701 hint=(
1702 "A constraint won't be created. Silence this "
1703 "warning if you don't care about it."
1704 ),
1705 obj=cls,
1706 id="models.W044",
1707 )
1708 )
1709 fields = set(
1710 chain.from_iterable(
1711 (*constraint.fields, *constraint.include)
1712 for constraint in cls._meta.constraints
1713 if isinstance(constraint, UniqueConstraint)
1714 )
1715 )
1716 references = set()
1717 for constraint in cls._meta.constraints:
1718 if isinstance(constraint, UniqueConstraint):
1719 if (
1720 connection.features.supports_partial_indexes
1721 or "supports_partial_indexes"
1722 not in cls._meta.required_db_features
1723 ) and isinstance(constraint.condition, Q):
1724 references.update(
1725 cls._get_expr_references(constraint.condition)
1726 )
1727 if (
1728 connection.features.supports_expression_indexes
1729 or "supports_expression_indexes"
1730 not in cls._meta.required_db_features
1731 ) and constraint.contains_expressions:
1732 for expression in constraint.expressions:
1733 references.update(cls._get_expr_references(expression))
1734 elif isinstance(constraint, CheckConstraint):
1735 if (
1736 connection.features.supports_table_check_constraints
1737 or "supports_table_check_constraints"
1738 not in cls._meta.required_db_features
1739 ):
1740 if isinstance(constraint.check, Q):
1741 references.update(
1742 cls._get_expr_references(constraint.check)
1743 )
1744 if any(
1745 isinstance(expr, RawSQL)
1746 for expr in constraint.check.flatten()
1747 ):
1748 errors.append(
1749 preflight.Warning(
1750 f"Check constraint {constraint.name!r} contains "
1751 f"RawSQL() expression and won't be validated "
1752 f"during the model full_clean().",
1753 hint=(
1754 "Silence this warning if you don't care about "
1755 "it."
1756 ),
1757 obj=cls,
1758 id="models.W045",
1759 ),
1760 )
1761 for field_name, *lookups in references:
1762 # pk is an alias that won't be found by opts.get_field.
1763 if field_name != "pk":
1764 fields.add(field_name)
1765 if not lookups:
1766 # If it has no lookups it cannot result in a JOIN.
1767 continue
1768 try:
1769 if field_name == "pk":
1770 field = cls._meta.pk
1771 else:
1772 field = cls._meta.get_field(field_name)
1773 if not field.is_relation or field.many_to_many or field.one_to_many:
1774 continue
1775 except FieldDoesNotExist:
1776 continue
1777 # JOIN must happen at the first lookup.
1778 first_lookup = lookups[0]
1779 if (
1780 hasattr(field, "get_transform")
1781 and hasattr(field, "get_lookup")
1782 and field.get_transform(first_lookup) is None
1783 and field.get_lookup(first_lookup) is None
1784 ):
1785 errors.append(
1786 preflight.Error(
1787 f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1788 obj=cls,
1789 id="models.E041",
1790 )
1791 )
1792 errors.extend(cls._check_local_fields(fields, "constraints"))
1793 return errors
1794
1795
1796########
1797# MISC #
1798########
1799
1800
1801def model_unpickle(model_id):
1802 """Used to unpickle Model subclasses with deferred fields."""
1803 if isinstance(model_id, tuple):
1804 model = models_registry.get_model(*model_id)
1805 else:
1806 # Backwards compat - the model was cached directly in earlier versions.
1807 model = model_id
1808 return model.__new__(model)
1809
1810
1811model_unpickle.__safe_for_unpickle__ = True