1from __future__ import annotations
2
3import copy
4import operator
5from collections.abc import Callable, Sequence
6from functools import cached_property, partial
7from typing import TYPE_CHECKING, Any, Self, cast
8
9from plain import exceptions
10from plain.postgres.constants import LOOKUP_SEP
11from plain.postgres.deletion import NO_ACTION, SET_NULL, OnDelete
12from plain.postgres.exceptions import FieldDoesNotExist, FieldError
13from plain.postgres.query_utils import PathInfo, Q
14from plain.postgres.utils import make_model_tuple
15from plain.preflight import PreflightResult
16
17from ..registry import models_registry
18from . import BLANK_CHOICE_DASH, Field
19from .base import ColumnField
20from .mixins import FieldCacheMixin
21from .related_descriptors import (
22 ForwardForeignKeyDescriptor,
23 ForwardManyToManyDescriptor,
24)
25from .related_lookups import (
26 RelatedExact,
27 RelatedGreaterThan,
28 RelatedGreaterThanOrEqual,
29 RelatedIn,
30 RelatedIsNull,
31 RelatedLessThan,
32 RelatedLessThanOrEqual,
33)
34from .reverse_related import ForeignKeyRel, ManyToManyRel
35
36if TYPE_CHECKING:
37 from plain.postgres.base import Model
38 from plain.postgres.connection import DatabaseConnection
39 from plain.postgres.fields.reverse_related import ForeignObjectRel
40
41RECURSIVE_RELATIONSHIP_CONSTANT = "self"
42
43
44def resolve_relation(
45 scope_model: type[Model], relation: type[Model] | str
46) -> type[Model] | str:
47 """
48 Transform relation into a model or fully-qualified model string of the form
49 "package_label.ModelName", relative to scope_model.
50
51 The relation argument can be:
52 * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
53 the model argument will be returned.
54 * A bare model name without an package_label, in which case scope_model's
55 package_label will be prepended.
56 * An "package_label.ModelName" string.
57 * A model class, which will be returned unchanged.
58 """
59 # Check for recursive relations
60 if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
61 relation = scope_model
62
63 # Look for an "app.Model" relation
64 if isinstance(relation, str):
65 if "." not in relation:
66 relation = f"{scope_model.model_options.package_label}.{relation}"
67
68 return relation
69
70
71def lazy_related_operation(
72 function: Any, model: type[Model], *related_models: type[Model] | str, **kwargs: Any
73) -> None:
74 """
75 Schedule `function` to be called once `model` and all `related_models`
76 have been imported and registered with the app registry. `function` will
77 be called with the newly-loaded model classes as its positional arguments,
78 plus any optional keyword arguments.
79
80 The `model` argument must be a model class. Each subsequent positional
81 argument is another model, or a reference to another model - see
82 `resolve_relation()` for the various forms these may take. Any relative
83 references will be resolved relative to `model`.
84
85 This is a convenience wrapper for `Packages.lazy_model_operation` - the app
86 registry model used is the one found in `model._model_meta.models_registry`.
87 """
88 models = [model] + [resolve_relation(model, rel) for rel in related_models]
89 model_keys = (make_model_tuple(m) for m in models)
90 models_registry = model._model_meta.models_registry
91 return models_registry.lazy_model_operation(
92 partial(function, **kwargs), *model_keys
93 )
94
95
96class RelatedField(FieldCacheMixin, Field):
97 """Base class that all relational fields inherit from."""
98
99 non_migration_attrs = (
100 *Field.non_migration_attrs,
101 "limit_choices_to",
102 "related_query_name",
103 )
104
105 # RelatedField always has a remote_field (never None)
106 remote_field: ForeignObjectRel
107 # path_infos is implemented as @cached_property in subclasses (ForeignKey, ManyToManyField)
108 path_infos: list[PathInfo]
109 # Set by ForeignKeyField / ManyToManyField in their __init__; declared
110 # here so RelatedField methods (deconstruct, related_query_name) can
111 # reference them without isinstance-narrowing.
112 _related_query_name: str | None
113 _limit_choices_to: Any
114
115 # No __init__: ForeignKeyField and ManyToManyField each set
116 # _related_query_name, _limit_choices_to, and remote_field themselves.
117
118 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
119 # Handle remote_field deepcopy for RelatedFields
120 obj = super().__deepcopy__(memodict)
121 obj.remote_field = copy.copy(self.remote_field)
122 if hasattr(self.remote_field, "field") and self.remote_field.field is self:
123 obj.remote_field.field = obj # ty: ignore[invalid-assignment]
124 return obj
125
126 @cached_property
127 def related_model(self) -> type[Model]:
128 # Can't cache this property until all the models are loaded.
129 models_registry.check_ready()
130 return self.remote_field.model
131
132 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
133 return [
134 *super().preflight(**kwargs),
135 *self._check_related_query_name_is_valid(),
136 *self._check_relation_model_exists(),
137 *self._check_clashes(),
138 ]
139
140 def _check_related_query_name_is_valid(self) -> list[PreflightResult]:
141 # Always validate related_query_name since it's still used for ORM queries
142 # (e.g., User.query.filter(articles__title="..."))
143 rel_query_name = self.related_query_name()
144 errors: list[PreflightResult] = []
145 if rel_query_name.endswith("_"):
146 errors.append(
147 PreflightResult(
148 fix=(
149 f"Reverse query name '{rel_query_name}' must not end with an underscore. "
150 "Use a different related_query_name."
151 ),
152 obj=self,
153 id="fields.related_field_accessor_clash",
154 )
155 )
156 if LOOKUP_SEP in rel_query_name:
157 errors.append(
158 PreflightResult(
159 fix=(
160 f"Reverse query name '{rel_query_name}' must not contain '{LOOKUP_SEP}'. "
161 "Use a different related_query_name."
162 ),
163 obj=self,
164 id="fields.related_field_query_name_clash",
165 )
166 )
167 return errors
168
169 def _check_relation_model_exists(self) -> list[PreflightResult]:
170 rel_is_missing = (
171 self.remote_field.model not in self.meta.models_registry.get_models()
172 )
173 rel_is_string = isinstance(self.remote_field.model, str)
174 model_name = (
175 self.remote_field.model
176 if rel_is_string
177 else self.remote_field.model.model_options.object_name
178 )
179 if rel_is_missing and rel_is_string:
180 return [
181 PreflightResult(
182 fix=(
183 f"Field defines a relation with model '{model_name}', which is not "
184 "installed. Ensure the model's package is registered."
185 ),
186 obj=self,
187 id="fields.related_model_not_installed",
188 )
189 ]
190 return []
191
192 def _check_clashes(self) -> list[PreflightResult]:
193 """Check accessor and reverse query name clashes."""
194 from plain.postgres.base import ModelBase
195
196 errors: list[PreflightResult] = []
197
198 # f.remote_field.model may be a string instead of a model. Skip if
199 # model name is not resolved.
200 if not isinstance(self.remote_field.model, ModelBase):
201 return []
202
203 # Consider that we are checking field `Model.foreign` and the models
204 # are:
205 #
206 # class Target(models.Model):
207 # model = models.IntegerField()
208 # model_set = models.IntegerField()
209 #
210 # class Model(models.Model):
211 # foreign = models.ForeignKeyField(Target)
212 # m2m = models.ManyToManyField(Target)
213
214 # rel_options.object_name == "Target"
215 rel_meta = self.remote_field.model._model_meta
216 rel_options = self.remote_field.model.model_options
217 rel_query_name = self.related_query_name() # i. e. "model"
218 # i.e. "package_label.Model.field".
219 field_name = f"{self.model.model_options.label}.{self.name}"
220
221 # Check clashes between reverse query name of `field`
222 # and any other field name.
223 potential_clashes = rel_meta.fields + rel_meta.many_to_many
224 for clash_field in potential_clashes:
225 # i.e. "package_label.Target.model_set".
226 clash_name = f"{rel_options.label}.{clash_field.name}"
227 if clash_field.name == rel_query_name:
228 errors.append(
229 PreflightResult(
230 fix=(
231 f"Reverse query name for '{field_name}' clashes with field name '{clash_name}'. "
232 f"Rename field '{clash_name}' or use a different related_query_name."
233 ),
234 obj=self,
235 id="fields.related_accessor_clash_manager",
236 )
237 )
238
239 return errors
240
241 def db_type(self) -> str | None:
242 # By default related field will not have a column as it relates to
243 # columns from another table.
244 return None
245
246 def unqualified_db_type(self) -> str | None:
247 return self.rel_db_type()
248
249 def contribute_to_class(self, cls: type[Model], name: str) -> None:
250 super().contribute_to_class(cls, name)
251
252 self.meta = cls._model_meta
253
254 if self.remote_field.related_query_name:
255 related_query_name = self.remote_field.related_query_name % {
256 "class": cls.__name__.lower(),
257 "package_label": cls.model_options.package_label.lower(),
258 }
259 self.remote_field.related_query_name = related_query_name
260
261 def resolve_related_class(
262 model: type[Model], related: type[Model], field: RelatedField
263 ) -> None:
264 field.remote_field.model = related
265 field.do_related_class(related, model)
266
267 lazy_related_operation(
268 resolve_related_class,
269 cls,
270 self.remote_field.model,
271 field=self,
272 )
273
274 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
275 name, path, args, kwargs = super().deconstruct()
276 if self._limit_choices_to:
277 kwargs["limit_choices_to"] = self._limit_choices_to
278 if self._related_query_name is not None:
279 kwargs["related_query_name"] = self._related_query_name
280 return name, path, args, kwargs
281
282 def set_attributes_from_rel(self) -> None:
283 self.name = self.name or (
284 self.remote_field.model.model_options.model_name + "_" + "id"
285 )
286 self.remote_field.set_field_name()
287
288 def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
289 self.set_attributes_from_rel()
290
291 def get_limit_choices_to(self) -> Any:
292 """
293 Return ``limit_choices_to`` for this model field.
294
295 If it is a callable, it will be invoked and the result will be
296 returned.
297 """
298 if callable(self.remote_field.limit_choices_to):
299 return self.remote_field.limit_choices_to() # ty: ignore[call-top-callable]
300 return self.remote_field.limit_choices_to
301
302 def get_choices(
303 self,
304 include_blank: bool = True,
305 blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
306 limit_choices_to: Any = None,
307 ordering: tuple[str, ...] = (),
308 ) -> list[tuple[Any, str]]:
309 """Return choices from the related model, for use as <select> options."""
310 rel_model = self.remote_field.model
311 if rel_model is None:
312 return blank_choice if include_blank else []
313 limit_choices_to = limit_choices_to or self.get_limit_choices_to()
314 get_related_field = getattr(self.remote_field, "get_related_field", None)
315 related_field_name = (
316 get_related_field().attname if get_related_field is not None else "id"
317 )
318 choice_func = operator.attrgetter(related_field_name)
319 qs = rel_model.query.complex_filter(limit_choices_to)
320 if ordering:
321 qs = qs.order_by(*ordering)
322 return (blank_choice if include_blank else []) + [
323 (choice_func(x), str(x)) for x in qs
324 ]
325
326 def related_query_name(self) -> str:
327 """
328 Define the name that can be used to identify this related object in a
329 table-spanning query.
330 """
331 return (
332 self.remote_field.related_query_name or self.model.model_options.model_name
333 )
334
335 @property
336 def target_field(self) -> Field:
337 """
338 When filtering against this relation, return the field on the remote
339 model against which the filtering should happen.
340 """
341 target_fields = self.path_infos[-1].target_fields
342 if len(target_fields) > 1:
343 raise FieldError(
344 "The relation has multiple target fields, but only single target field "
345 "was asked for"
346 )
347 return target_fields[0]
348
349 def get_cache_name(self) -> str:
350 assert self.name is not None, "Field name must be set"
351 return self.name
352
353
354class ForeignKeyField(ColumnField, RelatedField):
355 # Narrow the base class's `ForeignObjectRel` annotation — a FK's remote_field
356 # is always a ForeignKeyRel with a concrete on_delete action.
357 remote_field: ForeignKeyRel
358
359 """
360 Provide a many-to-one relation by adding a column to the local model
361 to hold the remote value.
362
363 ForeignKeyField targets the primary key (id) of the remote model.
364 """
365
366 non_migration_attrs = (
367 *RelatedField.non_migration_attrs,
368 *ColumnField.non_migration_attrs,
369 "on_delete",
370 )
371
372 empty_strings_allowed = False
373
374 def __init__(
375 self,
376 to: type[Model] | str,
377 on_delete: OnDelete,
378 related_query_name: str | None = None,
379 limit_choices_to: Any = None,
380 db_constraint: bool = True,
381 *,
382 required: bool = True,
383 allow_null: bool = False,
384 validators: Sequence[Callable[..., Any]] = (),
385 ):
386 # `default` and `choices` are intentionally not accepted: a hardcoded
387 # FK id default is a portability/existence footgun, and the related
388 # model itself already defines the valid set. Use `limit_choices_to`
389 # to constrain the target rows.
390 if not isinstance(to, str):
391 try:
392 to.model_options.model_name
393 except AttributeError:
394 raise TypeError(
395 f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ForeignKeyField must be "
396 f"either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
397 )
398 if not isinstance(on_delete, OnDelete):
399 raise TypeError(
400 "on_delete must be one of plain.postgres.CASCADE, SET_NULL, "
401 f"RESTRICT, or NO_ACTION; got {on_delete!r}"
402 )
403
404 super().__init__(
405 required=required,
406 allow_null=allow_null,
407 validators=validators,
408 )
409 self._related_query_name = related_query_name
410 self._limit_choices_to = limit_choices_to
411 self.remote_field = ForeignKeyRel(
412 field=self,
413 to=to,
414 on_delete=on_delete,
415 related_query_name=related_query_name,
416 limit_choices_to=limit_choices_to,
417 )
418 self.db_constraint = db_constraint
419
420 def __copy__(self) -> ForeignKeyField:
421 obj = super().__copy__()
422 # Remove any cached PathInfo values.
423 obj.__dict__.pop("path_infos", None)
424 obj.__dict__.pop("reverse_path_infos", None)
425 return obj
426
427 def __set__(self, instance: Any, value: Any) -> None:
428 """
429 Override Field's __set__ to clear cached related object when FK value changes.
430
431 This ensures that when you change obj.user_id, the cached obj.user is invalidated.
432 """
433 # Check if value is changing and clear cache if needed
434 if (
435 hasattr(self, "attname")
436 and instance.__dict__.get(self.attname) != value
437 and self.is_cached(instance)
438 ):
439 self.delete_cached_value(instance)
440
441 # Call parent's __set__ to do the actual assignment
442 super().__set__(instance, value)
443
444 @cached_property
445 def related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
446 return self.resolve_related_fields()
447
448 @cached_property
449 def reverse_related_fields(self) -> list[tuple[Field, Field]]:
450 return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
451
452 @cached_property
453 def local_related_fields(self) -> tuple[Field, ...]:
454 return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
455
456 @cached_property
457 def foreign_related_fields(self) -> tuple[Field, ...]:
458 return tuple(
459 rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
460 )
461
462 def get_forward_related_filter(self, obj: Model) -> dict[str, Any]:
463 """
464 Return the keyword arguments that when supplied to
465 self.model.object.filter(), would select all instances related through
466 this field to the remote obj. This is used to build the querysets
467 returned by related descriptors. obj is an instance of
468 self.related_field.model.
469 """
470 return {
471 f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
472 for _, rh_field in self.related_fields
473 }
474
475 def get_reverse_related_filter(self, obj: Model) -> Q:
476 """
477 Complement to get_forward_related_filter(). Return the keyword
478 arguments that when passed to self.related_field.model.object.filter()
479 select all instances of self.related_field.model related through
480 this field to obj. obj is an instance of self.model.
481 """
482 return Q.create(
483 [
484 (rh_field.attname, getattr(obj, lh_field.attname))
485 for lh_field, rh_field in self.related_fields
486 ]
487 )
488
489 def get_local_related_value(self, instance: Model) -> tuple[Any, ...]:
490 # Always returns the value of the single local field
491 field = self.local_related_fields[0]
492 if field.primary_key:
493 return (instance.id,)
494 return (getattr(instance, field.attname),)
495
496 def get_foreign_related_value(self, instance: Model) -> tuple[Any, ...]:
497 # Always returns the id of the foreign instance
498 return (instance.id,)
499
500 def get_joining_columns(
501 self, reverse_join: bool = False
502 ) -> tuple[tuple[str, str], ...]:
503 # Always returns a single column pair
504 if reverse_join:
505 from_field, to_field = self.related_fields[0]
506 return ((to_field.column, from_field.column),)
507 else:
508 from_field, to_field = self.related_fields[0]
509 return ((from_field.column, to_field.column),)
510
511 def get_reverse_joining_columns(self) -> tuple[tuple[str, str], ...]:
512 return self.get_joining_columns(reverse_join=True)
513
514 def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
515 """Get path from this field to the related model."""
516 meta = self.remote_field.model._model_meta
517 from_meta = self.model._model_meta
518 return [
519 PathInfo(
520 from_meta=from_meta,
521 to_meta=meta,
522 target_fields=self.foreign_related_fields,
523 join_field=self,
524 m2m=False,
525 direct=True,
526 filtered_relation=filtered_relation,
527 )
528 ]
529
530 @cached_property
531 def path_infos(self) -> list[PathInfo]:
532 return self.get_path_info()
533
534 def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
535 """Get path from the related model to this field's model."""
536 meta = self.model._model_meta
537 from_meta = self.remote_field.model._model_meta
538 return [
539 PathInfo(
540 from_meta=from_meta,
541 to_meta=meta,
542 target_fields=(meta.get_forward_field("id"),),
543 join_field=self.remote_field,
544 m2m=not self.primary_key,
545 direct=False,
546 filtered_relation=filtered_relation,
547 )
548 ]
549
550 @cached_property
551 def reverse_path_infos(self) -> list[PathInfo]:
552 return self.get_reverse_path_info()
553
554 def contribute_to_class(self, cls: type[Model], name: str) -> None:
555 super().contribute_to_class(cls, name)
556 setattr(cls, name, ForwardForeignKeyDescriptor(self))
557
558 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
559 return [
560 *super().preflight(**kwargs),
561 *self._check_on_delete(),
562 ]
563
564 def _check_on_delete(self) -> list[PreflightResult]:
565 on_delete = getattr(self.remote_field, "on_delete", None)
566 results: list[PreflightResult] = []
567 if on_delete is SET_NULL and not self.allow_null:
568 results.append(
569 PreflightResult(
570 fix=(
571 "Field specifies on_delete=SET_NULL, but cannot be null. "
572 "Set allow_null=True argument on the field, or change the on_delete rule."
573 ),
574 obj=self,
575 id="fields.foreign_key_null_constraint_violation",
576 )
577 )
578 if not self.db_constraint and on_delete is not NO_ACTION:
579 results.append(
580 PreflightResult(
581 fix=(
582 "db_constraint=False requires on_delete=NO_ACTION. "
583 "Without a FOREIGN KEY constraint there is no place for "
584 "Postgres to apply the delete behavior."
585 ),
586 obj=self,
587 id="fields.foreign_key_unconstrained_requires_no_action",
588 )
589 )
590 return results
591
592 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
593 name, path, args, kwargs = super().deconstruct()
594 kwargs["on_delete"] = self.remote_field.on_delete
595
596 if isinstance(self.remote_field.model, str):
597 if "." in self.remote_field.model:
598 package_label, model_name = self.remote_field.model.split(".")
599 kwargs["to"] = f"{package_label}.{model_name.lower()}"
600 else:
601 kwargs["to"] = self.remote_field.model.lower()
602 else:
603 kwargs["to"] = self.remote_field.model.model_options.label_lower
604
605 if self.db_constraint is not True:
606 kwargs["db_constraint"] = self.db_constraint
607
608 return name, path, args, kwargs
609
610 def to_python(self, value: Any) -> Any:
611 return self.target_field.to_python(value)
612
613 @property
614 def target_field(self) -> Field:
615 return self.foreign_related_fields[0]
616
617 def validate(self, value: Any, model_instance: Model) -> None:
618 super().validate(value, model_instance)
619 if value is None:
620 return None
621
622 field_name = self.remote_field.field_name
623 if field_name is None:
624 raise ValueError("remote_field.field_name cannot be None")
625 qs = self.remote_field.model._model_meta.base_queryset.filter(
626 **{field_name: value}
627 )
628 qs = qs.complex_filter(self.get_limit_choices_to())
629 if not qs.exists():
630 raise exceptions.ValidationError(
631 "%(model)s instance with %(field)s %(value)r does not exist.",
632 code="invalid",
633 params={
634 "model": self.remote_field.model.model_options.model_name,
635 "id": value,
636 "field": self.remote_field.field_name,
637 "value": value,
638 },
639 )
640
641 def resolve_related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
642 if isinstance(self.remote_field.model, str):
643 raise ValueError(
644 f"Related model {self.remote_field.model!r} cannot be resolved"
645 )
646 from_field = self
647 to_field = self.remote_field.model._model_meta.get_forward_field("id")
648 related_fields: list[tuple[ForeignKeyField, Field]] = [(from_field, to_field)]
649
650 for from_field, to_field in related_fields:
651 if to_field and to_field.model != self.remote_field.model:
652 raise FieldError(
653 f"'{self.model.model_options.label}.{self.name}' refers to field '{to_field.name}' which is not local to model "
654 f"'{self.remote_field.model.model_options.label}'."
655 )
656 return related_fields
657
658 def get_attname(self) -> str:
659 return f"{self.name}_id"
660
661 def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
662 if value is None or (
663 value == "" and not self.target_field.empty_strings_allowed
664 ):
665 return None
666 else:
667 return self.target_field.get_db_prep_save(value, connection=connection)
668
669 def get_db_prep_value(
670 self, value: Any, connection: DatabaseConnection, prepared: bool = False
671 ) -> Any:
672 return self.target_field.get_db_prep_value(value, connection, prepared)
673
674 def get_prep_value(self, value: Any) -> Any:
675 return self.target_field.get_prep_value(value)
676
677 def db_type(self) -> str | None:
678 return self.target_field.rel_db_type()
679
680 def cast_db_type(self) -> str | None:
681 return self.target_field.cast_db_type()
682
683 def get_col(self, alias: str | None, output_field: Field | None = None) -> Any:
684 if output_field is None:
685 output_field = self.target_field
686 while isinstance(output_field, ForeignKeyField):
687 output_field = output_field.target_field
688 if output_field is self:
689 raise ValueError("Cannot resolve output_field.")
690 return super().get_col(alias, output_field)
691
692
693# Register lookups for ForeignKey
694ForeignKeyField.register_lookup(RelatedIn)
695ForeignKeyField.register_lookup(RelatedExact)
696ForeignKeyField.register_lookup(RelatedLessThan)
697ForeignKeyField.register_lookup(RelatedGreaterThan)
698ForeignKeyField.register_lookup(RelatedGreaterThanOrEqual)
699ForeignKeyField.register_lookup(RelatedLessThanOrEqual)
700ForeignKeyField.register_lookup(RelatedIsNull)
701
702
703class ManyToManyField(RelatedField):
704 """
705 Provide a many-to-many relation by using an intermediary model that
706 holds two ForeignKeyField fields pointed at the two sides of the relation.
707
708 Unless a ``through`` model was provided, ManyToManyField will use the
709 create_many_to_many_intermediary_model factory to automatically generate
710 the intermediary model.
711 """
712
713 # ManyToManyField uses ManyToManyRel which has through/through_fields
714 remote_field: ManyToManyRel
715
716 def __init__(
717 self,
718 to: type[Model] | str,
719 *,
720 through: type[Model] | str,
721 through_fields: tuple[str, str] | None = None,
722 related_query_name: str | None = None,
723 limit_choices_to: Any = None,
724 symmetrical: bool | None = None,
725 ):
726 # M2M has no database column, so `required`, `allow_null`, `default`,
727 # `validators`, and `choices` are intentionally not accepted. Membership
728 # is managed through the related manager; filter target rows with
729 # `limit_choices_to`.
730 if not isinstance(to, str):
731 try:
732 to._model_meta
733 except AttributeError:
734 raise TypeError(
735 f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ManyToManyField "
736 f"must be either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
737 )
738
739 if symmetrical is None:
740 symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
741
742 if not through:
743 raise ValueError("ManyToManyField must have a 'through' argument.")
744
745 self.remote_field = ManyToManyRel(
746 field=self,
747 to=to,
748 related_query_name=related_query_name,
749 limit_choices_to=limit_choices_to,
750 symmetrical=symmetrical,
751 through=through,
752 through_fields=through_fields,
753 )
754
755 super().__init__()
756 self._related_query_name = related_query_name
757 self._limit_choices_to = limit_choices_to
758
759 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
760 return [
761 *super().preflight(**kwargs),
762 *self._check_relationship_model(**kwargs),
763 *self._check_table_uniqueness(**kwargs),
764 ]
765
766 def _check_relationship_model(
767 self, from_model: type[Model] | None = None, **kwargs: Any
768 ) -> list[PreflightResult]:
769 if hasattr(self.remote_field.through, "_model_meta"):
770 qualified_model_name = f"{self.remote_field.through.model_options.package_label}.{self.remote_field.through.__name__}"
771 else:
772 qualified_model_name = self.remote_field.through
773
774 errors = []
775
776 if self.remote_field.through not in self.meta.models_registry.get_models():
777 # The relationship model is not installed.
778 errors.append(
779 PreflightResult(
780 fix=(
781 "Field specifies a many-to-many relation through model "
782 f"'{qualified_model_name}', which has not been installed. "
783 "Ensure the through model is properly defined and installed."
784 ),
785 obj=self,
786 id="fields.m2m_through_model_not_installed",
787 )
788 )
789
790 else:
791 assert from_model is not None, (
792 "ManyToManyField with intermediate "
793 "tables cannot be checked if you don't pass the model "
794 "where the field is attached to."
795 )
796 # Set some useful local variables
797 to_model = resolve_relation(from_model, self.remote_field.model)
798 from_model_name = from_model.model_options.object_name
799 if isinstance(to_model, str):
800 to_model_name = to_model
801 else:
802 to_model_name = to_model.model_options.object_name
803 relationship_model_name = (
804 self.remote_field.through.model_options.object_name
805 )
806 self_referential = from_model == to_model
807 # Count foreign keys in intermediate model
808 if self_referential:
809 seen_self = sum(
810 from_model == field.remote_field.model
811 for field in self.remote_field.through._model_meta.fields
812 if isinstance(field, RelatedField)
813 )
814
815 if seen_self > 2 and not self.remote_field.through_fields:
816 errors.append(
817 PreflightResult(
818 fix=(
819 "The model is used as an intermediate model by "
820 f"'{self}', but it has more than two foreign keys "
821 f"to '{from_model_name}', which is ambiguous. "
822 "Use through_fields to specify which two foreign keys "
823 "Plain should use."
824 ),
825 obj=self.remote_field.through,
826 id="fields.m2m_through_model_ambiguous_fks",
827 )
828 )
829
830 else:
831 # Count foreign keys in relationship model
832 seen_from = sum(
833 from_model == field.remote_field.model
834 for field in self.remote_field.through._model_meta.fields
835 if isinstance(field, RelatedField)
836 )
837 seen_to = sum(
838 to_model == field.remote_field.model
839 for field in self.remote_field.through._model_meta.fields
840 if isinstance(field, RelatedField)
841 )
842
843 if seen_from > 1 and not self.remote_field.through_fields:
844 errors.append(
845 PreflightResult(
846 fix=(
847 "The model is used as an intermediate model by "
848 f"'{self}', but it has more than one foreign key "
849 f"from '{from_model_name}', which is ambiguous. You must specify "
850 "which foreign key Plain should use via the "
851 "through_fields keyword argument. "
852 "If you want to create a recursive relationship, "
853 f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
854 ),
855 obj=self,
856 id="fields.m2m_through_model_invalid_recursive_from",
857 )
858 )
859
860 if seen_to > 1 and not self.remote_field.through_fields:
861 errors.append(
862 PreflightResult(
863 fix=(
864 "The model is used as an intermediate model by "
865 f"'{self}', but it has more than one foreign key "
866 f"to '{to_model_name}', which is ambiguous. You must specify "
867 "which foreign key Plain should use via the "
868 "through_fields keyword argument. "
869 "If you want to create a recursive relationship, "
870 f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
871 ),
872 obj=self,
873 id="fields.m2m_through_model_invalid_recursive_to",
874 )
875 )
876
877 if seen_from == 0 or seen_to == 0:
878 errors.append(
879 PreflightResult(
880 fix=(
881 "The model is used as an intermediate model by "
882 f"'{self}', but it does not have a foreign key to '{from_model_name}' or '{to_model_name}'. "
883 "Add the required foreign keys to the through model."
884 ),
885 obj=self.remote_field.through,
886 id="fields.m2m_through_model_missing_fk",
887 )
888 )
889
890 # Validate `through_fields`.
891 if self.remote_field.through_fields is not None:
892 # Validate that we're given an iterable of at least two items
893 # and that none of them is "falsy".
894 if not (
895 len(self.remote_field.through_fields) >= 2
896 and self.remote_field.through_fields[0]
897 and self.remote_field.through_fields[1]
898 ):
899 errors.append(
900 PreflightResult(
901 fix=(
902 "Field specifies 'through_fields' but does not provide "
903 "the names of the two link fields that should be used "
904 f"for the relation through model '{qualified_model_name}'. "
905 "Make sure you specify 'through_fields' as "
906 "through_fields=('field1', 'field2')."
907 ),
908 obj=self,
909 id="fields.m2m_through_fields_wrong_length",
910 )
911 )
912
913 # Validate the given through fields -- they should be actual
914 # fields on the through model, and also be foreign keys to the
915 # expected models.
916 else:
917 assert from_model is not None, (
918 "ManyToManyField with intermediate "
919 "tables cannot be checked if you don't pass the model "
920 "where the field is attached to."
921 )
922
923 source, through, target = (
924 from_model,
925 self.remote_field.through,
926 self.remote_field.model,
927 )
928 source_field_name, target_field_name = self.remote_field.through_fields[
929 :2
930 ]
931
932 for field_name, related_model in (
933 (source_field_name, source),
934 (target_field_name, target),
935 ):
936 possible_field_names = []
937 for f in through._model_meta.fields:
938 if (
939 hasattr(f, "remote_field")
940 and getattr(f.remote_field, "model", None) == related_model
941 ):
942 possible_field_names.append(f.name)
943 if possible_field_names:
944 fix = (
945 "Did you mean one of the following foreign keys to '{}': "
946 "{}?".format(
947 related_model.model_options.object_name,
948 ", ".join(possible_field_names),
949 )
950 )
951 else:
952 fix = ""
953
954 try:
955 field = through._model_meta.get_forward_field(field_name)
956 except FieldDoesNotExist:
957 errors.append(
958 PreflightResult(
959 fix=f"The intermediary model '{qualified_model_name}' has no field '{field_name}'. {fix}",
960 obj=self,
961 id="fields.m2m_through_field_not_found",
962 )
963 )
964 else:
965 if not (
966 isinstance(field, RelatedField)
967 and field.remote_field.model == related_model
968 ):
969 errors.append(
970 PreflightResult(
971 fix=f"'{through.model_options.object_name}.{field_name}' is not a foreign key to '{related_model.model_options.object_name}'. {fix}",
972 obj=self,
973 id="fields.m2m_through_field_not_fk_to_model",
974 )
975 )
976
977 return errors
978
979 def _check_table_uniqueness(self, **kwargs: Any) -> list[PreflightResult]:
980 if isinstance(self.remote_field.through, str):
981 return []
982 registered_tables = {
983 model.model_options.db_table: model
984 for model in self.meta.models_registry.get_models()
985 if model != self.remote_field.through
986 }
987 m2m_db_table = self.m2m_db_table()
988 model = registered_tables.get(m2m_db_table)
989 # Check if there's already a m2m field using the same through model.
990 if model and model != self.remote_field.through:
991 clashing_obj = model.model_options.label
992 return [
993 PreflightResult(
994 fix=(
995 f"The field's intermediary table '{m2m_db_table}' clashes with the "
996 f"table name of '{clashing_obj}'. "
997 "Change the through model's db_table or use a different model."
998 ),
999 obj=self,
1000 id="fields.m2m_table_name_clash",
1001 )
1002 ]
1003 return []
1004
1005 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1006 name, path, args, kwargs = super().deconstruct()
1007
1008 if self.remote_field.db_constraint is not True:
1009 kwargs["db_constraint"] = self.remote_field.db_constraint
1010
1011 # Lowercase model names as they should be treated as case-insensitive.
1012 if isinstance(self.remote_field.model, str):
1013 if "." in self.remote_field.model:
1014 package_label, model_name = self.remote_field.model.split(".")
1015 kwargs["to"] = f"{package_label}.{model_name.lower()}"
1016 else:
1017 kwargs["to"] = self.remote_field.model.lower()
1018 else:
1019 kwargs["to"] = self.remote_field.model.model_options.label_lower
1020
1021 if isinstance(self.remote_field.through, str):
1022 kwargs["through"] = self.remote_field.through
1023 else:
1024 kwargs["through"] = self.remote_field.through.model_options.label
1025
1026 return name, path, args, kwargs
1027
1028 def _get_path_info(
1029 self, direct: bool = False, filtered_relation: Any = None
1030 ) -> list[PathInfo]:
1031 """Called by both direct and indirect m2m traversal."""
1032 int_model = self.remote_field.through
1033 # M2M through model fields are always ForeignKey
1034 linkfield1 = cast(
1035 ForeignKeyField,
1036 int_model._model_meta.get_forward_field(self.m2m_field_name()),
1037 )
1038 linkfield2 = cast(
1039 ForeignKeyField,
1040 int_model._model_meta.get_forward_field(self.m2m_reverse_field_name()),
1041 )
1042 if direct:
1043 join1infos = linkfield1.reverse_path_infos
1044 if filtered_relation:
1045 join2infos = linkfield2.get_path_info(filtered_relation)
1046 else:
1047 join2infos = linkfield2.path_infos
1048 else:
1049 join1infos = linkfield2.reverse_path_infos
1050 if filtered_relation:
1051 join2infos = linkfield1.get_path_info(filtered_relation)
1052 else:
1053 join2infos = linkfield1.path_infos
1054
1055 return [*join1infos, *join2infos]
1056
1057 def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1058 return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1059
1060 @cached_property
1061 def path_infos(self) -> list[PathInfo]:
1062 return self.get_path_info()
1063
1064 def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1065 return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1066
1067 @cached_property
1068 def reverse_path_infos(self) -> list[PathInfo]:
1069 return self.get_reverse_path_info()
1070
1071 def _get_m2m_db_table(self) -> str:
1072 """
1073 Function that can be curried to provide the m2m table name for this
1074 relation.
1075 """
1076 return self.remote_field.through.model_options.db_table
1077
1078 def _get_m2m_attr(self, related: Any, attr: str) -> Any:
1079 """
1080 Function that can be curried to provide the source accessor or DB
1081 column name for the m2m table.
1082 """
1083 cache_attr = f"_m2m_{attr}_cache"
1084 if hasattr(self, cache_attr):
1085 return getattr(self, cache_attr)
1086 if self.remote_field.through_fields is not None:
1087 link_field_name: str | None = self.remote_field.through_fields[0]
1088 else:
1089 link_field_name = None
1090 for f in self.remote_field.through._model_meta.fields:
1091 if (
1092 isinstance(f, RelatedField)
1093 and f.remote_field.model == related.related_model
1094 and (link_field_name is None or link_field_name == f.name)
1095 ):
1096 setattr(self, cache_attr, getattr(f, attr))
1097 return getattr(self, cache_attr)
1098 return None
1099
1100 def _get_m2m_reverse_attr(self, related: Any, attr: str) -> Any:
1101 """
1102 Function that can be curried to provide the related accessor or DB
1103 column name for the m2m table.
1104 """
1105 cache_attr = f"_m2m_reverse_{attr}_cache"
1106 if hasattr(self, cache_attr):
1107 return getattr(self, cache_attr)
1108 found = False
1109 if self.remote_field.through_fields is not None:
1110 link_field_name: str | None = self.remote_field.through_fields[1]
1111 else:
1112 link_field_name = None
1113 for f in self.remote_field.through._model_meta.fields:
1114 if isinstance(f, RelatedField) and f.remote_field.model == related.model:
1115 if link_field_name is None and related.related_model == related.model:
1116 # If this is an m2m-intermediate to self,
1117 # the first foreign key you find will be
1118 # the source column. Keep searching for
1119 # the second foreign key.
1120 if found:
1121 setattr(self, cache_attr, getattr(f, attr))
1122 break
1123 else:
1124 found = True
1125 elif link_field_name is None or link_field_name == f.name:
1126 setattr(self, cache_attr, getattr(f, attr))
1127 break
1128 return getattr(self, cache_attr)
1129
1130 def contribute_to_class(self, cls: type[Model], name: str) -> None:
1131 super().contribute_to_class(cls, name)
1132
1133 def resolve_through_model(
1134 _: Any, model: type[Model], field: ManyToManyField
1135 ) -> None:
1136 field.remote_field.through = model
1137
1138 lazy_related_operation(
1139 resolve_through_model,
1140 cls,
1141 self.remote_field.through,
1142 field=self,
1143 )
1144
1145 # Add the descriptor for the m2m relation.
1146 setattr(cls, self.name, ForwardManyToManyDescriptor(self.remote_field)) # ty: ignore[invalid-argument-type]
1147
1148 # Set up the accessor for the m2m table name for the relation.
1149 self.m2m_db_table = self._get_m2m_db_table
1150
1151 def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
1152 """Set up M2M metadata accessors for the through table."""
1153 super().do_related_class(other, cls)
1154
1155 # Set up the accessors for the column names on the m2m table.
1156 # These are used during query construction and schema operations.
1157 related = self.remote_field
1158 self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1159 self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1160
1161 self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1162 self.m2m_reverse_field_name = partial(
1163 self._get_m2m_reverse_attr, related, "name"
1164 )
1165
1166 get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1167 self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1168 get_m2m_reverse_rel = partial(
1169 self._get_m2m_reverse_attr, related, "remote_field"
1170 )
1171 self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1172
1173 def set_attributes_from_rel(self) -> None:
1174 pass
1175
1176 def value_from_object(self, obj: Model) -> list[Any]:
1177 return [] if obj.id is None else list(getattr(obj, self.attname).query)
1178
1179 def save_form_data(self, instance: Model, data: Any) -> None:
1180 getattr(instance, self.attname).set(data)
1181
1182 def db_type(self) -> None:
1183 # A ManyToManyField is not represented by a single column,
1184 # so return None.
1185 return None