1import functools
2import inspect
3from functools import partial
4
5from plain import exceptions, preflight
6from plain.models.backends import utils
7from plain.models.constants import LOOKUP_SEP
8from plain.models.db import connection, router
9from plain.models.deletion import CASCADE, SET_DEFAULT, SET_NULL
10from plain.models.query_utils import PathInfo, Q
11from plain.models.utils import make_model_tuple
12from plain.packages import packages
13from plain.runtime import settings
14from plain.runtime.user_settings import SettingsReference
15from plain.utils.functional import cached_property
16
17from . import Field
18from .mixins import FieldCacheMixin
19from .related_descriptors import (
20 ForeignKeyDeferredAttribute,
21 ForwardManyToOneDescriptor,
22 ForwardOneToOneDescriptor,
23 ManyToManyDescriptor,
24 ReverseManyToOneDescriptor,
25 ReverseOneToOneDescriptor,
26)
27from .related_lookups import (
28 RelatedExact,
29 RelatedGreaterThan,
30 RelatedGreaterThanOrEqual,
31 RelatedIn,
32 RelatedIsNull,
33 RelatedLessThan,
34 RelatedLessThanOrEqual,
35)
36from .reverse_related import ForeignObjectRel, ManyToManyRel, ManyToOneRel, OneToOneRel
37
38RECURSIVE_RELATIONSHIP_CONSTANT = "self"
39
40
41def resolve_relation(scope_model, relation):
42 """
43 Transform relation into a model or fully-qualified model string of the form
44 "package_label.ModelName", relative to scope_model.
45
46 The relation argument can be:
47 * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
48 the model argument will be returned.
49 * A bare model name without an package_label, in which case scope_model's
50 package_label will be prepended.
51 * An "package_label.ModelName" string.
52 * A model class, which will be returned unchanged.
53 """
54 # Check for recursive relations
55 if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
56 relation = scope_model
57
58 # Look for an "app.Model" relation
59 if isinstance(relation, str):
60 if "." not in relation:
61 relation = f"{scope_model._meta.package_label}.{relation}"
62
63 return relation
64
65
66def lazy_related_operation(function, model, *related_models, **kwargs):
67 """
68 Schedule `function` to be called once `model` and all `related_models`
69 have been imported and registered with the app registry. `function` will
70 be called with the newly-loaded model classes as its positional arguments,
71 plus any optional keyword arguments.
72
73 The `model` argument must be a model class. Each subsequent positional
74 argument is another model, or a reference to another model - see
75 `resolve_relation()` for the various forms these may take. Any relative
76 references will be resolved relative to `model`.
77
78 This is a convenience wrapper for `Packages.lazy_model_operation` - the app
79 registry model used is the one found in `model._meta.packages`.
80 """
81 models = [model] + [resolve_relation(model, rel) for rel in related_models]
82 model_keys = (make_model_tuple(m) for m in models)
83 packages = model._meta.packages
84 return packages.lazy_model_operation(partial(function, **kwargs), *model_keys)
85
86
87class RelatedField(FieldCacheMixin, Field):
88 """Base class that all relational fields inherit from."""
89
90 # Field flags
91 one_to_many = False
92 one_to_one = False
93 many_to_many = False
94 many_to_one = False
95
96 def __init__(
97 self,
98 related_name=None,
99 related_query_name=None,
100 limit_choices_to=None,
101 **kwargs,
102 ):
103 self._related_name = related_name
104 self._related_query_name = related_query_name
105 self._limit_choices_to = limit_choices_to
106 super().__init__(**kwargs)
107
108 @cached_property
109 def related_model(self):
110 # Can't cache this property until all the models are loaded.
111 packages.check_models_ready()
112 return self.remote_field.model
113
114 def check(self, **kwargs):
115 return [
116 *super().check(**kwargs),
117 *self._check_related_name_is_valid(),
118 *self._check_related_query_name_is_valid(),
119 *self._check_relation_model_exists(),
120 *self._check_referencing_to_swapped_model(),
121 *self._check_clashes(),
122 ]
123
124 def _check_related_name_is_valid(self):
125 import keyword
126
127 related_name = self.remote_field.related_name
128 if related_name is None:
129 return []
130 is_valid_id = (
131 not keyword.iskeyword(related_name) and related_name.isidentifier()
132 )
133 if not (is_valid_id or related_name.endswith("+")):
134 return [
135 preflight.Error(
136 "The name '{}' is invalid related_name for field {}.{}".format(
137 self.remote_field.related_name,
138 self.model._meta.object_name,
139 self.name,
140 ),
141 hint=(
142 "Related name must be a valid Python identifier or end with a "
143 "'+'"
144 ),
145 obj=self,
146 id="fields.E306",
147 )
148 ]
149 return []
150
151 def _check_related_query_name_is_valid(self):
152 if self.remote_field.is_hidden():
153 return []
154 rel_query_name = self.related_query_name()
155 errors = []
156 if rel_query_name.endswith("_"):
157 errors.append(
158 preflight.Error(
159 "Reverse query name '%s' must not end with an underscore."
160 % rel_query_name,
161 hint=(
162 "Add or change a related_name or related_query_name "
163 "argument for this field."
164 ),
165 obj=self,
166 id="fields.E308",
167 )
168 )
169 if LOOKUP_SEP in rel_query_name:
170 errors.append(
171 preflight.Error(
172 "Reverse query name '{}' must not contain '{}'.".format(
173 rel_query_name, LOOKUP_SEP
174 ),
175 hint=(
176 "Add or change a related_name or related_query_name "
177 "argument for this field."
178 ),
179 obj=self,
180 id="fields.E309",
181 )
182 )
183 return errors
184
185 def _check_relation_model_exists(self):
186 rel_is_missing = self.remote_field.model not in self.opts.packages.get_models()
187 rel_is_string = isinstance(self.remote_field.model, str)
188 model_name = (
189 self.remote_field.model
190 if rel_is_string
191 else self.remote_field.model._meta.object_name
192 )
193 if rel_is_missing and (
194 rel_is_string or not self.remote_field.model._meta.swapped
195 ):
196 return [
197 preflight.Error(
198 "Field defines a relation with model '%s', which is either "
199 "not installed, or is abstract." % model_name,
200 obj=self,
201 id="fields.E300",
202 )
203 ]
204 return []
205
206 def _check_referencing_to_swapped_model(self):
207 if (
208 self.remote_field.model not in self.opts.packages.get_models()
209 and not isinstance(self.remote_field.model, str)
210 and self.remote_field.model._meta.swapped
211 ):
212 return [
213 preflight.Error(
214 "Field defines a relation with the model '%s', which has "
215 "been swapped out." % self.remote_field.model._meta.label,
216 hint="Update the relation to point at 'settings.%s'."
217 % self.remote_field.model._meta.swappable,
218 obj=self,
219 id="fields.E301",
220 )
221 ]
222 return []
223
224 def _check_clashes(self):
225 """Check accessor and reverse query name clashes."""
226 from plain.models.base import ModelBase
227
228 errors = []
229 opts = self.model._meta
230
231 # f.remote_field.model may be a string instead of a model. Skip if
232 # model name is not resolved.
233 if not isinstance(self.remote_field.model, ModelBase):
234 return []
235
236 # Consider that we are checking field `Model.foreign` and the models
237 # are:
238 #
239 # class Target(models.Model):
240 # model = models.IntegerField()
241 # model_set = models.IntegerField()
242 #
243 # class Model(models.Model):
244 # foreign = models.ForeignKey(Target)
245 # m2m = models.ManyToManyField(Target)
246
247 # rel_opts.object_name == "Target"
248 rel_opts = self.remote_field.model._meta
249 # If the field doesn't install a backward relation on the target model
250 # (so `is_hidden` returns True), then there are no clashes to check
251 # and we can skip these fields.
252 rel_is_hidden = self.remote_field.is_hidden()
253 rel_name = self.remote_field.get_accessor_name() # i. e. "model_set"
254 rel_query_name = self.related_query_name() # i. e. "model"
255 # i.e. "package_label.Model.field".
256 field_name = f"{opts.label}.{self.name}"
257
258 # Check clashes between accessor or reverse query name of `field`
259 # and any other field name -- i.e. accessor for Model.foreign is
260 # model_set and it clashes with Target.model_set.
261 potential_clashes = rel_opts.fields + rel_opts.many_to_many
262 for clash_field in potential_clashes:
263 # i.e. "package_label.Target.model_set".
264 clash_name = f"{rel_opts.label}.{clash_field.name}"
265 if not rel_is_hidden and clash_field.name == rel_name:
266 errors.append(
267 preflight.Error(
268 f"Reverse accessor '{rel_opts.object_name}.{rel_name}' "
269 f"for '{field_name}' clashes with field name "
270 f"'{clash_name}'.",
271 hint=(
272 "Rename field '{}', or add/change a related_name "
273 "argument to the definition for field '{}'."
274 ).format(clash_name, field_name),
275 obj=self,
276 id="fields.E302",
277 )
278 )
279
280 if clash_field.name == rel_query_name:
281 errors.append(
282 preflight.Error(
283 "Reverse query name for '{}' clashes with field name '{}'.".format(
284 field_name, clash_name
285 ),
286 hint=(
287 "Rename field '{}', or add/change a related_name "
288 "argument to the definition for field '{}'."
289 ).format(clash_name, field_name),
290 obj=self,
291 id="fields.E303",
292 )
293 )
294
295 # Check clashes between accessors/reverse query names of `field` and
296 # any other field accessor -- i. e. Model.foreign accessor clashes with
297 # Model.m2m accessor.
298 potential_clashes = (r for r in rel_opts.related_objects if r.field is not self)
299 for clash_field in potential_clashes:
300 # i.e. "package_label.Model.m2m".
301 clash_name = "{}.{}".format(
302 clash_field.related_model._meta.label,
303 clash_field.field.name,
304 )
305 if not rel_is_hidden and clash_field.get_accessor_name() == rel_name:
306 errors.append(
307 preflight.Error(
308 f"Reverse accessor '{rel_opts.object_name}.{rel_name}' "
309 f"for '{field_name}' clashes with reverse accessor for "
310 f"'{clash_name}'.",
311 hint=(
312 "Add or change a related_name argument "
313 f"to the definition for '{field_name}' or '{clash_name}'."
314 ),
315 obj=self,
316 id="fields.E304",
317 )
318 )
319
320 if clash_field.get_accessor_name() == rel_query_name:
321 errors.append(
322 preflight.Error(
323 "Reverse query name for '{}' clashes with reverse query name "
324 "for '{}'.".format(field_name, clash_name),
325 hint=(
326 "Add or change a related_name argument "
327 f"to the definition for '{field_name}' or '{clash_name}'."
328 ),
329 obj=self,
330 id="fields.E305",
331 )
332 )
333
334 return errors
335
336 def db_type(self, connection):
337 # By default related field will not have a column as it relates to
338 # columns from another table.
339 return None
340
341 def contribute_to_class(self, cls, name, private_only=False, **kwargs):
342 super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
343
344 self.opts = cls._meta
345
346 if not cls._meta.abstract:
347 if self.remote_field.related_name:
348 related_name = self.remote_field.related_name
349 else:
350 related_name = self.opts.default_related_name
351 if related_name:
352 related_name %= {
353 "class": cls.__name__.lower(),
354 "model_name": cls._meta.model_name.lower(),
355 "package_label": cls._meta.package_label.lower(),
356 }
357 self.remote_field.related_name = related_name
358
359 if self.remote_field.related_query_name:
360 related_query_name = self.remote_field.related_query_name % {
361 "class": cls.__name__.lower(),
362 "package_label": cls._meta.package_label.lower(),
363 }
364 self.remote_field.related_query_name = related_query_name
365
366 def resolve_related_class(model, related, field):
367 field.remote_field.model = related
368 field.do_related_class(related, model)
369
370 lazy_related_operation(
371 resolve_related_class, cls, self.remote_field.model, field=self
372 )
373
374 def deconstruct(self):
375 name, path, args, kwargs = super().deconstruct()
376 if self._limit_choices_to:
377 kwargs["limit_choices_to"] = self._limit_choices_to
378 if self._related_name is not None:
379 kwargs["related_name"] = self._related_name
380 if self._related_query_name is not None:
381 kwargs["related_query_name"] = self._related_query_name
382 return name, path, args, kwargs
383
384 def get_forward_related_filter(self, obj):
385 """
386 Return the keyword arguments that when supplied to
387 self.model.object.filter(), would select all instances related through
388 this field to the remote obj. This is used to build the querysets
389 returned by related descriptors. obj is an instance of
390 self.related_field.model.
391 """
392 return {
393 f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
394 for _, rh_field in self.related_fields
395 }
396
397 def get_reverse_related_filter(self, obj):
398 """
399 Complement to get_forward_related_filter(). Return the keyword
400 arguments that when passed to self.related_field.model.object.filter()
401 select all instances of self.related_field.model related through
402 this field to obj. obj is an instance of self.model.
403 """
404 base_q = Q.create(
405 [
406 (rh_field.attname, getattr(obj, lh_field.attname))
407 for lh_field, rh_field in self.related_fields
408 ]
409 )
410 descriptor_filter = self.get_extra_descriptor_filter(obj)
411 if isinstance(descriptor_filter, dict):
412 return base_q & Q(**descriptor_filter)
413 elif descriptor_filter:
414 return base_q & descriptor_filter
415 return base_q
416
417 @property
418 def swappable_setting(self):
419 """
420 Get the setting that this is powered from for swapping, or None
421 if it's not swapped in / marked with swappable=False.
422 """
423 if self.swappable:
424 # Work out string form of "to"
425 if isinstance(self.remote_field.model, str):
426 to_string = self.remote_field.model
427 else:
428 to_string = self.remote_field.model._meta.label
429 return packages.get_swappable_settings_name(to_string)
430 return None
431
432 def set_attributes_from_rel(self):
433 self.name = self.name or (
434 self.remote_field.model._meta.model_name
435 + "_"
436 + self.remote_field.model._meta.pk.name
437 )
438 self.remote_field.set_field_name()
439
440 def do_related_class(self, other, cls):
441 self.set_attributes_from_rel()
442 self.contribute_to_related_class(other, self.remote_field)
443
444 def get_limit_choices_to(self):
445 """
446 Return ``limit_choices_to`` for this model field.
447
448 If it is a callable, it will be invoked and the result will be
449 returned.
450 """
451 if callable(self.remote_field.limit_choices_to):
452 return self.remote_field.limit_choices_to()
453 return self.remote_field.limit_choices_to
454
455 def related_query_name(self):
456 """
457 Define the name that can be used to identify this related object in a
458 table-spanning query.
459 """
460 return (
461 self.remote_field.related_query_name
462 or self.remote_field.related_name
463 or self.opts.model_name
464 )
465
466 @property
467 def target_field(self):
468 """
469 When filtering against this relation, return the field on the remote
470 model against which the filtering should happen.
471 """
472 target_fields = self.path_infos[-1].target_fields
473 if len(target_fields) > 1:
474 raise exceptions.FieldError(
475 "The relation has multiple target fields, but only single target field "
476 "was asked for"
477 )
478 return target_fields[0]
479
480 def get_cache_name(self):
481 return self.name
482
483
484class ForeignObject(RelatedField):
485 """
486 Abstraction of the ForeignKey relation to support multi-column relations.
487 """
488
489 # Field flags
490 many_to_many = False
491 many_to_one = True
492 one_to_many = False
493 one_to_one = False
494
495 requires_unique_target = True
496 related_accessor_class = ReverseManyToOneDescriptor
497 forward_related_accessor_class = ForwardManyToOneDescriptor
498 rel_class = ForeignObjectRel
499
500 def __init__(
501 self,
502 to,
503 on_delete,
504 from_fields,
505 to_fields,
506 rel=None,
507 related_name=None,
508 related_query_name=None,
509 limit_choices_to=None,
510 parent_link=False,
511 swappable=True,
512 **kwargs,
513 ):
514 if rel is None:
515 rel = self.rel_class(
516 self,
517 to,
518 related_name=related_name,
519 related_query_name=related_query_name,
520 limit_choices_to=limit_choices_to,
521 parent_link=parent_link,
522 on_delete=on_delete,
523 )
524
525 super().__init__(
526 rel=rel,
527 related_name=related_name,
528 related_query_name=related_query_name,
529 limit_choices_to=limit_choices_to,
530 **kwargs,
531 )
532
533 self.from_fields = from_fields
534 self.to_fields = to_fields
535 self.swappable = swappable
536
537 def __copy__(self):
538 obj = super().__copy__()
539 # Remove any cached PathInfo values.
540 obj.__dict__.pop("path_infos", None)
541 obj.__dict__.pop("reverse_path_infos", None)
542 return obj
543
544 def check(self, **kwargs):
545 return [
546 *super().check(**kwargs),
547 *self._check_to_fields_exist(),
548 *self._check_unique_target(),
549 ]
550
551 def _check_to_fields_exist(self):
552 # Skip nonexistent models.
553 if isinstance(self.remote_field.model, str):
554 return []
555
556 errors = []
557 for to_field in self.to_fields:
558 if to_field:
559 try:
560 self.remote_field.model._meta.get_field(to_field)
561 except exceptions.FieldDoesNotExist:
562 errors.append(
563 preflight.Error(
564 f"The to_field '{to_field}' doesn't exist on the related "
565 f"model '{self.remote_field.model._meta.label}'.",
566 obj=self,
567 id="fields.E312",
568 )
569 )
570 return errors
571
572 def _check_unique_target(self):
573 rel_is_string = isinstance(self.remote_field.model, str)
574 if rel_is_string or not self.requires_unique_target:
575 return []
576
577 try:
578 self.foreign_related_fields
579 except exceptions.FieldDoesNotExist:
580 return []
581
582 if not self.foreign_related_fields:
583 return []
584
585 unique_foreign_fields = {
586 frozenset([f.name])
587 for f in self.remote_field.model._meta.get_fields()
588 if getattr(f, "unique", False)
589 }
590 unique_foreign_fields.update(
591 {
592 frozenset(uc.fields)
593 for uc in self.remote_field.model._meta.total_unique_constraints
594 }
595 )
596 foreign_fields = {f.name for f in self.foreign_related_fields}
597 has_unique_constraint = any(u <= foreign_fields for u in unique_foreign_fields)
598
599 if not has_unique_constraint and len(self.foreign_related_fields) > 1:
600 field_combination = ", ".join(
601 "'%s'" % rel_field.name for rel_field in self.foreign_related_fields
602 )
603 model_name = self.remote_field.model.__name__
604 return [
605 preflight.Error(
606 "No subset of the fields {} on model '{}' is unique.".format(
607 field_combination, model_name
608 ),
609 hint=(
610 "Mark a single field as unique=True or add a set of "
611 "fields to a unique constraint (via "
612 "a UniqueConstraint (without condition) in the "
613 "model Meta.constraints)."
614 ),
615 obj=self,
616 id="fields.E310",
617 )
618 ]
619 elif not has_unique_constraint:
620 field_name = self.foreign_related_fields[0].name
621 model_name = self.remote_field.model.__name__
622 return [
623 preflight.Error(
624 "'{}.{}' must be unique because it is referenced by "
625 "a foreign key.".format(model_name, field_name),
626 hint=(
627 "Add unique=True to this field or add a "
628 "UniqueConstraint (without condition) in the model "
629 "Meta.constraints."
630 ),
631 obj=self,
632 id="fields.E311",
633 )
634 ]
635 else:
636 return []
637
638 def deconstruct(self):
639 name, path, args, kwargs = super().deconstruct()
640 kwargs["on_delete"] = self.remote_field.on_delete
641 kwargs["from_fields"] = self.from_fields
642 kwargs["to_fields"] = self.to_fields
643
644 if self.remote_field.parent_link:
645 kwargs["parent_link"] = self.remote_field.parent_link
646 if isinstance(self.remote_field.model, str):
647 if "." in self.remote_field.model:
648 package_label, model_name = self.remote_field.model.split(".")
649 kwargs["to"] = f"{package_label}.{model_name.lower()}"
650 else:
651 kwargs["to"] = self.remote_field.model.lower()
652 else:
653 kwargs["to"] = self.remote_field.model._meta.label_lower
654 # If swappable is True, then see if we're actually pointing to the target
655 # of a swap.
656 swappable_setting = self.swappable_setting
657 if swappable_setting is not None:
658 # If it's already a settings reference, error
659 if hasattr(kwargs["to"], "setting_name"):
660 if kwargs["to"].setting_name != swappable_setting:
661 raise ValueError(
662 "Cannot deconstruct a ForeignKey pointing to a model "
663 "that is swapped in place of more than one model ({} and {})".format(
664 kwargs["to"].setting_name, swappable_setting
665 )
666 )
667 # Set it
668 kwargs["to"] = SettingsReference(
669 kwargs["to"],
670 swappable_setting,
671 )
672 return name, path, args, kwargs
673
674 def resolve_related_fields(self):
675 if not self.from_fields or len(self.from_fields) != len(self.to_fields):
676 raise ValueError(
677 "Foreign Object from and to fields must be the same non-zero length"
678 )
679 if isinstance(self.remote_field.model, str):
680 raise ValueError(
681 "Related model %r cannot be resolved" % self.remote_field.model
682 )
683 related_fields = []
684 for index in range(len(self.from_fields)):
685 from_field_name = self.from_fields[index]
686 to_field_name = self.to_fields[index]
687 from_field = (
688 self
689 if from_field_name == RECURSIVE_RELATIONSHIP_CONSTANT
690 else self.opts.get_field(from_field_name)
691 )
692 to_field = (
693 self.remote_field.model._meta.pk
694 if to_field_name is None
695 else self.remote_field.model._meta.get_field(to_field_name)
696 )
697 related_fields.append((from_field, to_field))
698 return related_fields
699
700 @cached_property
701 def related_fields(self):
702 return self.resolve_related_fields()
703
704 @cached_property
705 def reverse_related_fields(self):
706 return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
707
708 @cached_property
709 def local_related_fields(self):
710 return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
711
712 @cached_property
713 def foreign_related_fields(self):
714 return tuple(
715 rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
716 )
717
718 def get_local_related_value(self, instance):
719 return self.get_instance_value_for_fields(instance, self.local_related_fields)
720
721 def get_foreign_related_value(self, instance):
722 return self.get_instance_value_for_fields(instance, self.foreign_related_fields)
723
724 @staticmethod
725 def get_instance_value_for_fields(instance, fields):
726 ret = []
727 opts = instance._meta
728 for field in fields:
729 # Gotcha: in some cases (like fixture loading) a model can have
730 # different values in parent_ptr_id and parent's id. So, use
731 # instance.pk (that is, parent_ptr_id) when asked for instance.id.
732 if field.primary_key:
733 possible_parent_link = opts.get_ancestor_link(field.model)
734 if (
735 not possible_parent_link
736 or possible_parent_link.primary_key
737 or possible_parent_link.model._meta.abstract
738 ):
739 ret.append(instance.pk)
740 continue
741 ret.append(getattr(instance, field.attname))
742 return tuple(ret)
743
744 def get_attname_column(self):
745 attname, column = super().get_attname_column()
746 return attname, None
747
748 def get_joining_columns(self, reverse_join=False):
749 source = self.reverse_related_fields if reverse_join else self.related_fields
750 return tuple(
751 (lhs_field.column, rhs_field.column) for lhs_field, rhs_field in source
752 )
753
754 def get_reverse_joining_columns(self):
755 return self.get_joining_columns(reverse_join=True)
756
757 def get_extra_descriptor_filter(self, instance):
758 """
759 Return an extra filter condition for related object fetching when
760 user does 'instance.fieldname', that is the extra filter is used in
761 the descriptor of the field.
762
763 The filter should be either a dict usable in .filter(**kwargs) call or
764 a Q-object. The condition will be ANDed together with the relation's
765 joining columns.
766
767 A parallel method is get_extra_restriction() which is used in
768 JOIN and subquery conditions.
769 """
770 return {}
771
772 def get_extra_restriction(self, alias, related_alias):
773 """
774 Return a pair condition used for joining and subquery pushdown. The
775 condition is something that responds to as_sql(compiler, connection)
776 method.
777
778 Note that currently referring both the 'alias' and 'related_alias'
779 will not work in some conditions, like subquery pushdown.
780
781 A parallel method is get_extra_descriptor_filter() which is used in
782 instance.fieldname related object fetching.
783 """
784 return None
785
786 def get_path_info(self, filtered_relation=None):
787 """Get path from this field to the related model."""
788 opts = self.remote_field.model._meta
789 from_opts = self.model._meta
790 return [
791 PathInfo(
792 from_opts=from_opts,
793 to_opts=opts,
794 target_fields=self.foreign_related_fields,
795 join_field=self,
796 m2m=False,
797 direct=True,
798 filtered_relation=filtered_relation,
799 )
800 ]
801
802 @cached_property
803 def path_infos(self):
804 return self.get_path_info()
805
806 def get_reverse_path_info(self, filtered_relation=None):
807 """Get path from the related model to this field's model."""
808 opts = self.model._meta
809 from_opts = self.remote_field.model._meta
810 return [
811 PathInfo(
812 from_opts=from_opts,
813 to_opts=opts,
814 target_fields=(opts.pk,),
815 join_field=self.remote_field,
816 m2m=not self.unique,
817 direct=False,
818 filtered_relation=filtered_relation,
819 )
820 ]
821
822 @cached_property
823 def reverse_path_infos(self):
824 return self.get_reverse_path_info()
825
826 @classmethod
827 @functools.cache
828 def get_class_lookups(cls):
829 bases = inspect.getmro(cls)
830 bases = bases[: bases.index(ForeignObject) + 1]
831 class_lookups = [parent.__dict__.get("class_lookups", {}) for parent in bases]
832 return cls.merge_dicts(class_lookups)
833
834 def contribute_to_class(self, cls, name, private_only=False, **kwargs):
835 super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
836 setattr(cls, self.name, self.forward_related_accessor_class(self))
837
838 def contribute_to_related_class(self, cls, related):
839 # Internal FK's - i.e., those with a related name ending with '+' -
840 # and swapped models don't get a related descriptor.
841 if (
842 not self.remote_field.is_hidden()
843 and not related.related_model._meta.swapped
844 ):
845 setattr(
846 cls._meta.concrete_model,
847 related.get_accessor_name(),
848 self.related_accessor_class(related),
849 )
850 # While 'limit_choices_to' might be a callable, simply pass
851 # it along for later - this is too early because it's still
852 # model load time.
853 if self.remote_field.limit_choices_to:
854 cls._meta.related_fkey_lookups.append(
855 self.remote_field.limit_choices_to
856 )
857
858
859ForeignObject.register_lookup(RelatedIn)
860ForeignObject.register_lookup(RelatedExact)
861ForeignObject.register_lookup(RelatedLessThan)
862ForeignObject.register_lookup(RelatedGreaterThan)
863ForeignObject.register_lookup(RelatedGreaterThanOrEqual)
864ForeignObject.register_lookup(RelatedLessThanOrEqual)
865ForeignObject.register_lookup(RelatedIsNull)
866
867
868class ForeignKey(ForeignObject):
869 """
870 Provide a many-to-one relation by adding a column to the local model
871 to hold the remote value.
872
873 By default ForeignKey will target the pk of the remote model but this
874 behavior can be changed by using the ``to_field`` argument.
875 """
876
877 descriptor_class = ForeignKeyDeferredAttribute
878 # Field flags
879 many_to_many = False
880 many_to_one = True
881 one_to_many = False
882 one_to_one = False
883
884 rel_class = ManyToOneRel
885
886 empty_strings_allowed = False
887 default_error_messages = {
888 "invalid": "%(model)s instance with %(field)s %(value)r does not exist."
889 }
890 description = "Foreign Key (type determined by related field)"
891
892 def __init__(
893 self,
894 to,
895 on_delete,
896 related_name=None,
897 related_query_name=None,
898 limit_choices_to=None,
899 parent_link=False,
900 to_field=None,
901 db_constraint=True,
902 **kwargs,
903 ):
904 try:
905 to._meta.model_name
906 except AttributeError:
907 if not isinstance(to, str):
908 raise TypeError(
909 "{}({!r}) is invalid. First parameter to ForeignKey must be "
910 "either a model, a model name, or the string {!r}".format(
911 self.__class__.__name__,
912 to,
913 RECURSIVE_RELATIONSHIP_CONSTANT,
914 )
915 )
916 else:
917 # For backwards compatibility purposes, we need to *try* and set
918 # the to_field during FK construction. It won't be guaranteed to
919 # be correct until contribute_to_class is called. Refs #12190.
920 to_field = to_field or (to._meta.pk and to._meta.pk.name)
921 if not callable(on_delete):
922 raise TypeError("on_delete must be callable.")
923
924 kwargs["rel"] = self.rel_class(
925 self,
926 to,
927 to_field,
928 related_name=related_name,
929 related_query_name=related_query_name,
930 limit_choices_to=limit_choices_to,
931 parent_link=parent_link,
932 on_delete=on_delete,
933 )
934 kwargs.setdefault("db_index", True)
935
936 super().__init__(
937 to,
938 on_delete,
939 related_name=related_name,
940 related_query_name=related_query_name,
941 limit_choices_to=limit_choices_to,
942 from_fields=[RECURSIVE_RELATIONSHIP_CONSTANT],
943 to_fields=[to_field],
944 **kwargs,
945 )
946 self.db_constraint = db_constraint
947
948 def __class_getitem__(cls, *args, **kwargs):
949 return cls
950
951 def check(self, **kwargs):
952 return [
953 *super().check(**kwargs),
954 *self._check_on_delete(),
955 *self._check_unique(),
956 ]
957
958 def _check_on_delete(self):
959 on_delete = getattr(self.remote_field, "on_delete", None)
960 if on_delete == SET_NULL and not self.null:
961 return [
962 preflight.Error(
963 "Field specifies on_delete=SET_NULL, but cannot be null.",
964 hint=(
965 "Set null=True argument on the field, or change the on_delete "
966 "rule."
967 ),
968 obj=self,
969 id="fields.E320",
970 )
971 ]
972 elif on_delete == SET_DEFAULT and not self.has_default():
973 return [
974 preflight.Error(
975 "Field specifies on_delete=SET_DEFAULT, but has no default value.",
976 hint="Set a default value, or change the on_delete rule.",
977 obj=self,
978 id="fields.E321",
979 )
980 ]
981 else:
982 return []
983
984 def _check_unique(self, **kwargs):
985 return (
986 [
987 preflight.Warning(
988 "Setting unique=True on a ForeignKey has the same effect as using "
989 "a OneToOneField.",
990 hint=(
991 "ForeignKey(unique=True) is usually better served by a "
992 "OneToOneField."
993 ),
994 obj=self,
995 id="fields.W342",
996 )
997 ]
998 if self.unique
999 else []
1000 )
1001
1002 def deconstruct(self):
1003 name, path, args, kwargs = super().deconstruct()
1004 del kwargs["to_fields"]
1005 del kwargs["from_fields"]
1006 # Handle the simpler arguments
1007 if self.db_index:
1008 del kwargs["db_index"]
1009 else:
1010 kwargs["db_index"] = False
1011 if self.db_constraint is not True:
1012 kwargs["db_constraint"] = self.db_constraint
1013 # Rel needs more work.
1014 to_meta = getattr(self.remote_field.model, "_meta", None)
1015 if self.remote_field.field_name and (
1016 not to_meta
1017 or (to_meta.pk and self.remote_field.field_name != to_meta.pk.name)
1018 ):
1019 kwargs["to_field"] = self.remote_field.field_name
1020 return name, path, args, kwargs
1021
1022 def to_python(self, value):
1023 return self.target_field.to_python(value)
1024
1025 @property
1026 def target_field(self):
1027 return self.foreign_related_fields[0]
1028
1029 def validate(self, value, model_instance):
1030 if self.remote_field.parent_link:
1031 return
1032 super().validate(value, model_instance)
1033 if value is None:
1034 return
1035
1036 using = router.db_for_read(self.remote_field.model, instance=model_instance)
1037 qs = self.remote_field.model._base_manager.using(using).filter(
1038 **{self.remote_field.field_name: value}
1039 )
1040 qs = qs.complex_filter(self.get_limit_choices_to())
1041 if not qs.exists():
1042 raise exceptions.ValidationError(
1043 self.error_messages["invalid"],
1044 code="invalid",
1045 params={
1046 "model": self.remote_field.model._meta.model_name,
1047 "pk": value,
1048 "field": self.remote_field.field_name,
1049 "value": value,
1050 }, # 'pk' is included for backwards compatibility
1051 )
1052
1053 def resolve_related_fields(self):
1054 related_fields = super().resolve_related_fields()
1055 for from_field, to_field in related_fields:
1056 if (
1057 to_field
1058 and to_field.model != self.remote_field.model._meta.concrete_model
1059 ):
1060 raise exceptions.FieldError(
1061 "'{}.{}' refers to field '{}' which is not local to model "
1062 "'{}'.".format(
1063 self.model._meta.label,
1064 self.name,
1065 to_field.name,
1066 self.remote_field.model._meta.concrete_model._meta.label,
1067 )
1068 )
1069 return related_fields
1070
1071 def get_attname(self):
1072 return "%s_id" % self.name
1073
1074 def get_attname_column(self):
1075 attname = self.get_attname()
1076 column = self.db_column or attname
1077 return attname, column
1078
1079 def get_default(self):
1080 """Return the to_field if the default value is an object."""
1081 field_default = super().get_default()
1082 if isinstance(field_default, self.remote_field.model):
1083 return getattr(field_default, self.target_field.attname)
1084 return field_default
1085
1086 def get_db_prep_save(self, value, connection):
1087 if value is None or (
1088 value == ""
1089 and (
1090 not self.target_field.empty_strings_allowed
1091 or connection.features.interprets_empty_strings_as_nulls
1092 )
1093 ):
1094 return None
1095 else:
1096 return self.target_field.get_db_prep_save(value, connection=connection)
1097
1098 def get_db_prep_value(self, value, connection, prepared=False):
1099 return self.target_field.get_db_prep_value(value, connection, prepared)
1100
1101 def get_prep_value(self, value):
1102 return self.target_field.get_prep_value(value)
1103
1104 def contribute_to_related_class(self, cls, related):
1105 super().contribute_to_related_class(cls, related)
1106 if self.remote_field.field_name is None:
1107 self.remote_field.field_name = cls._meta.pk.name
1108
1109 def db_check(self, connection):
1110 return None
1111
1112 def db_type(self, connection):
1113 return self.target_field.rel_db_type(connection=connection)
1114
1115 def cast_db_type(self, connection):
1116 return self.target_field.cast_db_type(connection=connection)
1117
1118 def db_parameters(self, connection):
1119 target_db_parameters = self.target_field.db_parameters(connection)
1120 return {
1121 "type": self.db_type(connection),
1122 "check": self.db_check(connection),
1123 "collation": target_db_parameters.get("collation"),
1124 }
1125
1126 def convert_empty_strings(self, value, expression, connection):
1127 if (not value) and isinstance(value, str):
1128 return None
1129 return value
1130
1131 def get_db_converters(self, connection):
1132 converters = super().get_db_converters(connection)
1133 if connection.features.interprets_empty_strings_as_nulls:
1134 converters += [self.convert_empty_strings]
1135 return converters
1136
1137 def get_col(self, alias, output_field=None):
1138 if output_field is None:
1139 output_field = self.target_field
1140 while isinstance(output_field, ForeignKey):
1141 output_field = output_field.target_field
1142 if output_field is self:
1143 raise ValueError("Cannot resolve output_field.")
1144 return super().get_col(alias, output_field)
1145
1146
1147class OneToOneField(ForeignKey):
1148 """
1149 A OneToOneField is essentially the same as a ForeignKey, with the exception
1150 that it always carries a "unique" constraint with it and the reverse
1151 relation always returns the object pointed to (since there will only ever
1152 be one), rather than returning a list.
1153 """
1154
1155 # Field flags
1156 many_to_many = False
1157 many_to_one = False
1158 one_to_many = False
1159 one_to_one = True
1160
1161 related_accessor_class = ReverseOneToOneDescriptor
1162 forward_related_accessor_class = ForwardOneToOneDescriptor
1163 rel_class = OneToOneRel
1164
1165 description = "One-to-one relationship"
1166
1167 def __init__(self, to, on_delete, to_field=None, **kwargs):
1168 kwargs["unique"] = True
1169 super().__init__(to, on_delete, to_field=to_field, **kwargs)
1170
1171 def deconstruct(self):
1172 name, path, args, kwargs = super().deconstruct()
1173 if "unique" in kwargs:
1174 del kwargs["unique"]
1175 return name, path, args, kwargs
1176
1177 def save_form_data(self, instance, data):
1178 if isinstance(data, self.remote_field.model):
1179 setattr(instance, self.name, data)
1180 else:
1181 setattr(instance, self.attname, data)
1182 # Remote field object must be cleared otherwise Model.save()
1183 # will reassign attname using the related object pk.
1184 if data is None:
1185 setattr(instance, self.name, data)
1186
1187 def _check_unique(self, **kwargs):
1188 # Override ForeignKey since check isn't applicable here.
1189 return []
1190
1191
1192def create_many_to_many_intermediary_model(field, klass):
1193 from plain import models
1194
1195 def set_managed(model, related, through):
1196 through._meta.managed = model._meta.managed or related._meta.managed
1197
1198 to_model = resolve_relation(klass, field.remote_field.model)
1199 name = f"{klass._meta.object_name}_{field.name}"
1200 lazy_related_operation(set_managed, klass, to_model, name)
1201
1202 to = make_model_tuple(to_model)[1]
1203 from_ = klass._meta.model_name
1204 if to == from_:
1205 to = "to_%s" % to
1206 from_ = "from_%s" % from_
1207
1208 meta = type(
1209 "Meta",
1210 (),
1211 {
1212 "db_table": field._get_m2m_db_table(klass._meta),
1213 "auto_created": klass,
1214 "package_label": klass._meta.package_label,
1215 "db_tablespace": klass._meta.db_tablespace,
1216 "constraints": [
1217 models.UniqueConstraint(
1218 fields=[from_, to],
1219 name=f"{klass._meta.package_label}_{name.lower()}_unique",
1220 )
1221 ],
1222 "packages": field.model._meta.packages,
1223 },
1224 )
1225 # Construct and return the new class.
1226 return type(
1227 name,
1228 (models.Model,),
1229 {
1230 "Meta": meta,
1231 "__module__": klass.__module__,
1232 from_: models.ForeignKey(
1233 klass,
1234 related_name="%s+" % name,
1235 db_tablespace=field.db_tablespace,
1236 db_constraint=field.remote_field.db_constraint,
1237 on_delete=CASCADE,
1238 ),
1239 to: models.ForeignKey(
1240 to_model,
1241 related_name="%s+" % name,
1242 db_tablespace=field.db_tablespace,
1243 db_constraint=field.remote_field.db_constraint,
1244 on_delete=CASCADE,
1245 ),
1246 },
1247 )
1248
1249
1250class ManyToManyField(RelatedField):
1251 """
1252 Provide a many-to-many relation by using an intermediary model that
1253 holds two ForeignKey fields pointed at the two sides of the relation.
1254
1255 Unless a ``through`` model was provided, ManyToManyField will use the
1256 create_many_to_many_intermediary_model factory to automatically generate
1257 the intermediary model.
1258 """
1259
1260 # Field flags
1261 many_to_many = True
1262 many_to_one = False
1263 one_to_many = False
1264 one_to_one = False
1265
1266 rel_class = ManyToManyRel
1267
1268 description = "Many-to-many relationship"
1269
1270 def __init__(
1271 self,
1272 to,
1273 related_name=None,
1274 related_query_name=None,
1275 limit_choices_to=None,
1276 symmetrical=None,
1277 through=None,
1278 through_fields=None,
1279 db_constraint=True,
1280 db_table=None,
1281 swappable=True,
1282 **kwargs,
1283 ):
1284 try:
1285 to._meta
1286 except AttributeError:
1287 if not isinstance(to, str):
1288 raise TypeError(
1289 "{}({!r}) is invalid. First parameter to ManyToManyField "
1290 "must be either a model, a model name, or the string {!r}".format(
1291 self.__class__.__name__,
1292 to,
1293 RECURSIVE_RELATIONSHIP_CONSTANT,
1294 )
1295 )
1296
1297 if symmetrical is None:
1298 symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
1299
1300 if through is not None and db_table is not None:
1301 raise ValueError(
1302 "Cannot specify a db_table if an intermediary model is used."
1303 )
1304
1305 kwargs["rel"] = self.rel_class(
1306 self,
1307 to,
1308 related_name=related_name,
1309 related_query_name=related_query_name,
1310 limit_choices_to=limit_choices_to,
1311 symmetrical=symmetrical,
1312 through=through,
1313 through_fields=through_fields,
1314 db_constraint=db_constraint,
1315 )
1316 self.has_null_arg = "null" in kwargs
1317
1318 super().__init__(
1319 related_name=related_name,
1320 related_query_name=related_query_name,
1321 limit_choices_to=limit_choices_to,
1322 **kwargs,
1323 )
1324
1325 self.db_table = db_table
1326 self.swappable = swappable
1327
1328 def check(self, **kwargs):
1329 return [
1330 *super().check(**kwargs),
1331 *self._check_unique(**kwargs),
1332 *self._check_relationship_model(**kwargs),
1333 *self._check_ignored_options(**kwargs),
1334 *self._check_table_uniqueness(**kwargs),
1335 ]
1336
1337 def _check_unique(self, **kwargs):
1338 if self.unique:
1339 return [
1340 preflight.Error(
1341 "ManyToManyFields cannot be unique.",
1342 obj=self,
1343 id="fields.E330",
1344 )
1345 ]
1346 return []
1347
1348 def _check_ignored_options(self, **kwargs):
1349 warnings = []
1350
1351 if self.has_null_arg:
1352 warnings.append(
1353 preflight.Warning(
1354 "null has no effect on ManyToManyField.",
1355 obj=self,
1356 id="fields.W340",
1357 )
1358 )
1359
1360 if self._validators:
1361 warnings.append(
1362 preflight.Warning(
1363 "ManyToManyField does not support validators.",
1364 obj=self,
1365 id="fields.W341",
1366 )
1367 )
1368 if self.remote_field.symmetrical and self._related_name:
1369 warnings.append(
1370 preflight.Warning(
1371 "related_name has no effect on ManyToManyField "
1372 'with a symmetrical relationship, e.g. to "self".',
1373 obj=self,
1374 id="fields.W345",
1375 )
1376 )
1377 if self.db_comment:
1378 warnings.append(
1379 preflight.Warning(
1380 "db_comment has no effect on ManyToManyField.",
1381 obj=self,
1382 id="fields.W346",
1383 )
1384 )
1385
1386 return warnings
1387
1388 def _check_relationship_model(self, from_model=None, **kwargs):
1389 if hasattr(self.remote_field.through, "_meta"):
1390 qualified_model_name = "{}.{}".format(
1391 self.remote_field.through._meta.package_label,
1392 self.remote_field.through.__name__,
1393 )
1394 else:
1395 qualified_model_name = self.remote_field.through
1396
1397 errors = []
1398
1399 if self.remote_field.through not in self.opts.packages.get_models(
1400 include_auto_created=True
1401 ):
1402 # The relationship model is not installed.
1403 errors.append(
1404 preflight.Error(
1405 "Field specifies a many-to-many relation through model "
1406 "'%s', which has not been installed." % qualified_model_name,
1407 obj=self,
1408 id="fields.E331",
1409 )
1410 )
1411
1412 else:
1413 assert from_model is not None, (
1414 "ManyToManyField with intermediate "
1415 "tables cannot be checked if you don't pass the model "
1416 "where the field is attached to."
1417 )
1418 # Set some useful local variables
1419 to_model = resolve_relation(from_model, self.remote_field.model)
1420 from_model_name = from_model._meta.object_name
1421 if isinstance(to_model, str):
1422 to_model_name = to_model
1423 else:
1424 to_model_name = to_model._meta.object_name
1425 relationship_model_name = self.remote_field.through._meta.object_name
1426 self_referential = from_model == to_model
1427 # Count foreign keys in intermediate model
1428 if self_referential:
1429 seen_self = sum(
1430 from_model == getattr(field.remote_field, "model", None)
1431 for field in self.remote_field.through._meta.fields
1432 )
1433
1434 if seen_self > 2 and not self.remote_field.through_fields:
1435 errors.append(
1436 preflight.Error(
1437 "The model is used as an intermediate model by "
1438 "'{}', but it has more than two foreign keys "
1439 "to '{}', which is ambiguous. You must specify "
1440 "which two foreign keys Plain should use via the "
1441 "through_fields keyword argument.".format(
1442 self, from_model_name
1443 ),
1444 hint=(
1445 "Use through_fields to specify which two foreign keys "
1446 "Plain should use."
1447 ),
1448 obj=self.remote_field.through,
1449 id="fields.E333",
1450 )
1451 )
1452
1453 else:
1454 # Count foreign keys in relationship model
1455 seen_from = sum(
1456 from_model == getattr(field.remote_field, "model", None)
1457 for field in self.remote_field.through._meta.fields
1458 )
1459 seen_to = sum(
1460 to_model == getattr(field.remote_field, "model", None)
1461 for field in self.remote_field.through._meta.fields
1462 )
1463
1464 if seen_from > 1 and not self.remote_field.through_fields:
1465 errors.append(
1466 preflight.Error(
1467 (
1468 "The model is used as an intermediate model by "
1469 "'{}', but it has more than one foreign key "
1470 "from '{}', which is ambiguous. You must specify "
1471 "which foreign key Plain should use via the "
1472 "through_fields keyword argument."
1473 ).format(self, from_model_name),
1474 hint=(
1475 "If you want to create a recursive relationship, "
1476 'use ManyToManyField("{}", through="{}").'
1477 ).format(
1478 RECURSIVE_RELATIONSHIP_CONSTANT,
1479 relationship_model_name,
1480 ),
1481 obj=self,
1482 id="fields.E334",
1483 )
1484 )
1485
1486 if seen_to > 1 and not self.remote_field.through_fields:
1487 errors.append(
1488 preflight.Error(
1489 "The model is used as an intermediate model by "
1490 "'{}', but it has more than one foreign key "
1491 "to '{}', which is ambiguous. You must specify "
1492 "which foreign key Plain should use via the "
1493 "through_fields keyword argument.".format(
1494 self, to_model_name
1495 ),
1496 hint=(
1497 "If you want to create a recursive relationship, "
1498 'use ManyToManyField("{}", through="{}").'
1499 ).format(
1500 RECURSIVE_RELATIONSHIP_CONSTANT,
1501 relationship_model_name,
1502 ),
1503 obj=self,
1504 id="fields.E335",
1505 )
1506 )
1507
1508 if seen_from == 0 or seen_to == 0:
1509 errors.append(
1510 preflight.Error(
1511 "The model is used as an intermediate model by "
1512 "'{}', but it does not have a foreign key to '{}' or '{}'.".format(
1513 self, from_model_name, to_model_name
1514 ),
1515 obj=self.remote_field.through,
1516 id="fields.E336",
1517 )
1518 )
1519
1520 # Validate `through_fields`.
1521 if self.remote_field.through_fields is not None:
1522 # Validate that we're given an iterable of at least two items
1523 # and that none of them is "falsy".
1524 if not (
1525 len(self.remote_field.through_fields) >= 2
1526 and self.remote_field.through_fields[0]
1527 and self.remote_field.through_fields[1]
1528 ):
1529 errors.append(
1530 preflight.Error(
1531 "Field specifies 'through_fields' but does not provide "
1532 "the names of the two link fields that should be used "
1533 "for the relation through model '%s'." % qualified_model_name,
1534 hint=(
1535 "Make sure you specify 'through_fields' as "
1536 "through_fields=('field1', 'field2')"
1537 ),
1538 obj=self,
1539 id="fields.E337",
1540 )
1541 )
1542
1543 # Validate the given through fields -- they should be actual
1544 # fields on the through model, and also be foreign keys to the
1545 # expected models.
1546 else:
1547 assert from_model is not None, (
1548 "ManyToManyField with intermediate "
1549 "tables cannot be checked if you don't pass the model "
1550 "where the field is attached to."
1551 )
1552
1553 source, through, target = (
1554 from_model,
1555 self.remote_field.through,
1556 self.remote_field.model,
1557 )
1558 source_field_name, target_field_name = self.remote_field.through_fields[
1559 :2
1560 ]
1561
1562 for field_name, related_model in (
1563 (source_field_name, source),
1564 (target_field_name, target),
1565 ):
1566 possible_field_names = []
1567 for f in through._meta.fields:
1568 if (
1569 hasattr(f, "remote_field")
1570 and getattr(f.remote_field, "model", None) == related_model
1571 ):
1572 possible_field_names.append(f.name)
1573 if possible_field_names:
1574 hint = (
1575 "Did you mean one of the following foreign keys to '{}': "
1576 "{}?".format(
1577 related_model._meta.object_name,
1578 ", ".join(possible_field_names),
1579 )
1580 )
1581 else:
1582 hint = None
1583
1584 try:
1585 field = through._meta.get_field(field_name)
1586 except exceptions.FieldDoesNotExist:
1587 errors.append(
1588 preflight.Error(
1589 "The intermediary model '{}' has no field '{}'.".format(
1590 qualified_model_name, field_name
1591 ),
1592 hint=hint,
1593 obj=self,
1594 id="fields.E338",
1595 )
1596 )
1597 else:
1598 if not (
1599 hasattr(field, "remote_field")
1600 and getattr(field.remote_field, "model", None)
1601 == related_model
1602 ):
1603 errors.append(
1604 preflight.Error(
1605 "'{}.{}' is not a foreign key to '{}'.".format(
1606 through._meta.object_name,
1607 field_name,
1608 related_model._meta.object_name,
1609 ),
1610 hint=hint,
1611 obj=self,
1612 id="fields.E339",
1613 )
1614 )
1615
1616 return errors
1617
1618 def _check_table_uniqueness(self, **kwargs):
1619 if (
1620 isinstance(self.remote_field.through, str)
1621 or not self.remote_field.through._meta.managed
1622 ):
1623 return []
1624 registered_tables = {
1625 model._meta.db_table: model
1626 for model in self.opts.packages.get_models(include_auto_created=True)
1627 if model != self.remote_field.through and model._meta.managed
1628 }
1629 m2m_db_table = self.m2m_db_table()
1630 model = registered_tables.get(m2m_db_table)
1631 # The second condition allows multiple m2m relations on a model if
1632 # some point to a through model that proxies another through model.
1633 if (
1634 model
1635 and model._meta.concrete_model
1636 != self.remote_field.through._meta.concrete_model
1637 ):
1638 if model._meta.auto_created:
1639
1640 def _get_field_name(model):
1641 for field in model._meta.auto_created._meta.many_to_many:
1642 if field.remote_field.through is model:
1643 return field.name
1644
1645 opts = model._meta.auto_created._meta
1646 clashing_obj = f"{opts.label}.{_get_field_name(model)}"
1647 else:
1648 clashing_obj = model._meta.label
1649 if settings.DATABASE_ROUTERS:
1650 error_class, error_id = preflight.Warning, "fields.W344"
1651 error_hint = (
1652 "You have configured settings.DATABASE_ROUTERS. Verify "
1653 "that the table of %r is correctly routed to a separate "
1654 "database." % clashing_obj
1655 )
1656 else:
1657 error_class, error_id = preflight.Error, "fields.E340"
1658 error_hint = None
1659 return [
1660 error_class(
1661 f"The field's intermediary table '{m2m_db_table}' clashes with the "
1662 f"table name of '{clashing_obj}'.",
1663 obj=self,
1664 hint=error_hint,
1665 id=error_id,
1666 )
1667 ]
1668 return []
1669
1670 def deconstruct(self):
1671 name, path, args, kwargs = super().deconstruct()
1672 # Handle the simpler arguments.
1673 if self.db_table is not None:
1674 kwargs["db_table"] = self.db_table
1675 if self.remote_field.db_constraint is not True:
1676 kwargs["db_constraint"] = self.remote_field.db_constraint
1677 # Lowercase model names as they should be treated as case-insensitive.
1678 if isinstance(self.remote_field.model, str):
1679 if "." in self.remote_field.model:
1680 package_label, model_name = self.remote_field.model.split(".")
1681 kwargs["to"] = f"{package_label}.{model_name.lower()}"
1682 else:
1683 kwargs["to"] = self.remote_field.model.lower()
1684 else:
1685 kwargs["to"] = self.remote_field.model._meta.label_lower
1686 if getattr(self.remote_field, "through", None) is not None:
1687 if isinstance(self.remote_field.through, str):
1688 kwargs["through"] = self.remote_field.through
1689 elif not self.remote_field.through._meta.auto_created:
1690 kwargs["through"] = self.remote_field.through._meta.label
1691 # If swappable is True, then see if we're actually pointing to the target
1692 # of a swap.
1693 swappable_setting = self.swappable_setting
1694 if swappable_setting is not None:
1695 # If it's already a settings reference, error.
1696 if hasattr(kwargs["to"], "setting_name"):
1697 if kwargs["to"].setting_name != swappable_setting:
1698 raise ValueError(
1699 "Cannot deconstruct a ManyToManyField pointing to a "
1700 "model that is swapped in place of more than one model "
1701 "({} and {})".format(
1702 kwargs["to"].setting_name, swappable_setting
1703 )
1704 )
1705
1706 kwargs["to"] = SettingsReference(
1707 kwargs["to"],
1708 swappable_setting,
1709 )
1710 return name, path, args, kwargs
1711
1712 def _get_path_info(self, direct=False, filtered_relation=None):
1713 """Called by both direct and indirect m2m traversal."""
1714 int_model = self.remote_field.through
1715 linkfield1 = int_model._meta.get_field(self.m2m_field_name())
1716 linkfield2 = int_model._meta.get_field(self.m2m_reverse_field_name())
1717 if direct:
1718 join1infos = linkfield1.reverse_path_infos
1719 if filtered_relation:
1720 join2infos = linkfield2.get_path_info(filtered_relation)
1721 else:
1722 join2infos = linkfield2.path_infos
1723 else:
1724 join1infos = linkfield2.reverse_path_infos
1725 if filtered_relation:
1726 join2infos = linkfield1.get_path_info(filtered_relation)
1727 else:
1728 join2infos = linkfield1.path_infos
1729 # Get join infos between the last model of join 1 and the first model
1730 # of join 2. Assume the only reason these may differ is due to model
1731 # inheritance.
1732 join1_final = join1infos[-1].to_opts
1733 join2_initial = join2infos[0].from_opts
1734 if join1_final is join2_initial:
1735 intermediate_infos = []
1736 elif issubclass(join1_final.model, join2_initial.model):
1737 intermediate_infos = join1_final.get_path_to_parent(join2_initial.model)
1738 else:
1739 intermediate_infos = join2_initial.get_path_from_parent(join1_final.model)
1740
1741 return [*join1infos, *intermediate_infos, *join2infos]
1742
1743 def get_path_info(self, filtered_relation=None):
1744 return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1745
1746 @cached_property
1747 def path_infos(self):
1748 return self.get_path_info()
1749
1750 def get_reverse_path_info(self, filtered_relation=None):
1751 return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1752
1753 @cached_property
1754 def reverse_path_infos(self):
1755 return self.get_reverse_path_info()
1756
1757 def _get_m2m_db_table(self, opts):
1758 """
1759 Function that can be curried to provide the m2m table name for this
1760 relation.
1761 """
1762 if self.remote_field.through is not None:
1763 return self.remote_field.through._meta.db_table
1764 elif self.db_table:
1765 return self.db_table
1766 else:
1767 m2m_table_name = f"{utils.strip_quotes(opts.db_table)}_{self.name}"
1768 return utils.truncate_name(m2m_table_name, connection.ops.max_name_length())
1769
1770 def _get_m2m_attr(self, related, attr):
1771 """
1772 Function that can be curried to provide the source accessor or DB
1773 column name for the m2m table.
1774 """
1775 cache_attr = "_m2m_%s_cache" % attr
1776 if hasattr(self, cache_attr):
1777 return getattr(self, cache_attr)
1778 if self.remote_field.through_fields is not None:
1779 link_field_name = self.remote_field.through_fields[0]
1780 else:
1781 link_field_name = None
1782 for f in self.remote_field.through._meta.fields:
1783 if (
1784 f.is_relation
1785 and f.remote_field.model == related.related_model
1786 and (link_field_name is None or link_field_name == f.name)
1787 ):
1788 setattr(self, cache_attr, getattr(f, attr))
1789 return getattr(self, cache_attr)
1790
1791 def _get_m2m_reverse_attr(self, related, attr):
1792 """
1793 Function that can be curried to provide the related accessor or DB
1794 column name for the m2m table.
1795 """
1796 cache_attr = "_m2m_reverse_%s_cache" % attr
1797 if hasattr(self, cache_attr):
1798 return getattr(self, cache_attr)
1799 found = False
1800 if self.remote_field.through_fields is not None:
1801 link_field_name = self.remote_field.through_fields[1]
1802 else:
1803 link_field_name = None
1804 for f in self.remote_field.through._meta.fields:
1805 if f.is_relation and f.remote_field.model == related.model:
1806 if link_field_name is None and related.related_model == related.model:
1807 # If this is an m2m-intermediate to self,
1808 # the first foreign key you find will be
1809 # the source column. Keep searching for
1810 # the second foreign key.
1811 if found:
1812 setattr(self, cache_attr, getattr(f, attr))
1813 break
1814 else:
1815 found = True
1816 elif link_field_name is None or link_field_name == f.name:
1817 setattr(self, cache_attr, getattr(f, attr))
1818 break
1819 return getattr(self, cache_attr)
1820
1821 def contribute_to_class(self, cls, name, **kwargs):
1822 # To support multiple relations to self, it's useful to have a non-None
1823 # related name on symmetrical relations for internal reasons. The
1824 # concept doesn't make a lot of sense externally ("you want me to
1825 # specify *what* on my non-reversible relation?!"), so we set it up
1826 # automatically. The funky name reduces the chance of an accidental
1827 # clash.
1828 if self.remote_field.symmetrical and (
1829 self.remote_field.model == RECURSIVE_RELATIONSHIP_CONSTANT
1830 or self.remote_field.model == cls._meta.object_name
1831 ):
1832 self.remote_field.related_name = "%s_rel_+" % name
1833 elif self.remote_field.is_hidden():
1834 # If the backwards relation is disabled, replace the original
1835 # related_name with one generated from the m2m field name. Django
1836 # still uses backwards relations internally and we need to avoid
1837 # clashes between multiple m2m fields with related_name == '+'.
1838 self.remote_field.related_name = "_{}_{}_{}_+".format(
1839 cls._meta.package_label,
1840 cls.__name__.lower(),
1841 name,
1842 )
1843
1844 super().contribute_to_class(cls, name, **kwargs)
1845
1846 # The intermediate m2m model is not auto created if:
1847 # 1) There is a manually specified intermediate, or
1848 # 2) The class owning the m2m field is abstract.
1849 # 3) The class owning the m2m field has been swapped out.
1850 if not cls._meta.abstract:
1851 if self.remote_field.through:
1852
1853 def resolve_through_model(_, model, field):
1854 field.remote_field.through = model
1855
1856 lazy_related_operation(
1857 resolve_through_model, cls, self.remote_field.through, field=self
1858 )
1859 elif not cls._meta.swapped:
1860 self.remote_field.through = create_many_to_many_intermediary_model(
1861 self, cls
1862 )
1863
1864 # Add the descriptor for the m2m relation.
1865 setattr(cls, self.name, ManyToManyDescriptor(self.remote_field, reverse=False))
1866
1867 # Set up the accessor for the m2m table name for the relation.
1868 self.m2m_db_table = partial(self._get_m2m_db_table, cls._meta)
1869
1870 def contribute_to_related_class(self, cls, related):
1871 # Internal M2Ms (i.e., those with a related name ending with '+')
1872 # and swapped models don't get a related descriptor.
1873 if (
1874 not self.remote_field.is_hidden()
1875 and not related.related_model._meta.swapped
1876 ):
1877 setattr(
1878 cls,
1879 related.get_accessor_name(),
1880 ManyToManyDescriptor(self.remote_field, reverse=True),
1881 )
1882
1883 # Set up the accessors for the column names on the m2m table.
1884 self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1885 self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1886
1887 self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1888 self.m2m_reverse_field_name = partial(
1889 self._get_m2m_reverse_attr, related, "name"
1890 )
1891
1892 get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1893 self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1894 get_m2m_reverse_rel = partial(
1895 self._get_m2m_reverse_attr, related, "remote_field"
1896 )
1897 self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1898
1899 def set_attributes_from_rel(self):
1900 pass
1901
1902 def value_from_object(self, obj):
1903 return [] if obj.pk is None else list(getattr(obj, self.attname).all())
1904
1905 def save_form_data(self, instance, data):
1906 getattr(instance, self.attname).set(data)
1907
1908 def db_check(self, connection):
1909 return None
1910
1911 def db_type(self, connection):
1912 # A ManyToManyField is not represented by a single column,
1913 # so return None.
1914 return None
1915
1916 def db_parameters(self, connection):
1917 return {"type": None, "check": None}