1from __future__ import annotations
2
3import copy
4from functools import cached_property, partial
5from typing import TYPE_CHECKING, Any, Self, cast
6
7from plain import exceptions
8from plain.postgres.constants import LOOKUP_SEP
9from plain.postgres.deletion import SET_DEFAULT, SET_NULL
10from plain.postgres.exceptions import FieldDoesNotExist, FieldError
11from plain.postgres.query_utils import PathInfo, Q
12from plain.postgres.utils import make_model_tuple
13from plain.preflight import PreflightResult
14from plain.runtime import SettingsReference
15
16from ..registry import models_registry
17from . import DbParameters, Field
18from .mixins import FieldCacheMixin
19from .related_descriptors import (
20 ForwardForeignKeyDescriptor,
21 ForwardManyToManyDescriptor,
22)
23from .related_lookups import (
24 RelatedExact,
25 RelatedGreaterThan,
26 RelatedGreaterThanOrEqual,
27 RelatedIn,
28 RelatedIsNull,
29 RelatedLessThan,
30 RelatedLessThanOrEqual,
31)
32from .reverse_related import ForeignKeyRel, ManyToManyRel
33
34if TYPE_CHECKING:
35 from plain.postgres.base import Model
36 from plain.postgres.connection import DatabaseConnection
37 from plain.postgres.fields.reverse_related import ForeignObjectRel
38
39RECURSIVE_RELATIONSHIP_CONSTANT = "self"
40
41
42def resolve_relation(
43 scope_model: type[Model], relation: type[Model] | str
44) -> type[Model] | str:
45 """
46 Transform relation into a model or fully-qualified model string of the form
47 "package_label.ModelName", relative to scope_model.
48
49 The relation argument can be:
50 * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
51 the model argument will be returned.
52 * A bare model name without an package_label, in which case scope_model's
53 package_label will be prepended.
54 * An "package_label.ModelName" string.
55 * A model class, which will be returned unchanged.
56 """
57 # Check for recursive relations
58 if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
59 relation = scope_model
60
61 # Look for an "app.Model" relation
62 if isinstance(relation, str):
63 if "." not in relation:
64 relation = f"{scope_model.model_options.package_label}.{relation}"
65
66 return relation
67
68
69def lazy_related_operation(
70 function: Any, model: type[Model], *related_models: type[Model] | str, **kwargs: Any
71) -> None:
72 """
73 Schedule `function` to be called once `model` and all `related_models`
74 have been imported and registered with the app registry. `function` will
75 be called with the newly-loaded model classes as its positional arguments,
76 plus any optional keyword arguments.
77
78 The `model` argument must be a model class. Each subsequent positional
79 argument is another model, or a reference to another model - see
80 `resolve_relation()` for the various forms these may take. Any relative
81 references will be resolved relative to `model`.
82
83 This is a convenience wrapper for `Packages.lazy_model_operation` - the app
84 registry model used is the one found in `model._model_meta.models_registry`.
85 """
86 models = [model] + [resolve_relation(model, rel) for rel in related_models]
87 model_keys = (make_model_tuple(m) for m in models)
88 models_registry = model._model_meta.models_registry
89 return models_registry.lazy_model_operation(
90 partial(function, **kwargs), *model_keys
91 )
92
93
94class RelatedField(FieldCacheMixin, Field):
95 """Base class that all relational fields inherit from."""
96
97 # RelatedField always has a remote_field (never None)
98 remote_field: ForeignObjectRel
99 # path_infos is implemented as @cached_property in subclasses (ForeignKey, ManyToManyField)
100 path_infos: list[PathInfo]
101
102 def __init__(
103 self,
104 *,
105 related_query_name: str | None = None,
106 limit_choices_to: Any = None,
107 **kwargs: Any,
108 ):
109 self._related_query_name = related_query_name
110 self._limit_choices_to = limit_choices_to
111 super().__init__(**kwargs)
112
113 def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
114 # Handle remote_field deepcopy for RelatedFields
115 obj = super().__deepcopy__(memodict)
116 obj.remote_field = copy.copy(self.remote_field)
117 if hasattr(self.remote_field, "field") and self.remote_field.field is self:
118 obj.remote_field.field = obj # type: ignore[misc]
119 return obj
120
121 @cached_property
122 def related_model(self) -> type[Model]:
123 # Can't cache this property until all the models are loaded.
124 models_registry.check_ready()
125 return self.remote_field.model
126
127 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
128 return [
129 *super().preflight(**kwargs),
130 *self._check_related_query_name_is_valid(),
131 *self._check_relation_model_exists(),
132 *self._check_clashes(),
133 ]
134
135 def _check_related_query_name_is_valid(self) -> list[PreflightResult]:
136 # Always validate related_query_name since it's still used for ORM queries
137 # (e.g., User.query.filter(articles__title="..."))
138 rel_query_name = self.related_query_name()
139 errors: list[PreflightResult] = []
140 if rel_query_name.endswith("_"):
141 errors.append(
142 PreflightResult(
143 fix=(
144 f"Reverse query name '{rel_query_name}' must not end with an underscore. "
145 "Use a different related_query_name."
146 ),
147 obj=self,
148 id="fields.related_field_accessor_clash",
149 )
150 )
151 if LOOKUP_SEP in rel_query_name:
152 errors.append(
153 PreflightResult(
154 fix=(
155 f"Reverse query name '{rel_query_name}' must not contain '{LOOKUP_SEP}'. "
156 "Use a different related_query_name."
157 ),
158 obj=self,
159 id="fields.related_field_query_name_clash",
160 )
161 )
162 return errors
163
164 def _check_relation_model_exists(self) -> list[PreflightResult]:
165 rel_is_missing = (
166 self.remote_field.model not in self.meta.models_registry.get_models()
167 )
168 rel_is_string = isinstance(self.remote_field.model, str)
169 model_name = (
170 self.remote_field.model
171 if rel_is_string
172 else self.remote_field.model.model_options.object_name
173 )
174 if rel_is_missing and rel_is_string:
175 return [
176 PreflightResult(
177 fix=(
178 f"Field defines a relation with model '{model_name}', which is either "
179 "not installed, or is abstract. Ensure the model is installed and not abstract."
180 ),
181 obj=self,
182 id="fields.related_model_not_installed",
183 )
184 ]
185 return []
186
187 def _check_clashes(self) -> list[PreflightResult]:
188 """Check accessor and reverse query name clashes."""
189 from plain.postgres.base import ModelBase
190
191 errors: list[PreflightResult] = []
192
193 # f.remote_field.model may be a string instead of a model. Skip if
194 # model name is not resolved.
195 if not isinstance(self.remote_field.model, ModelBase):
196 return []
197
198 # Consider that we are checking field `Model.foreign` and the models
199 # are:
200 #
201 # class Target(models.Model):
202 # model = models.IntegerField()
203 # model_set = models.IntegerField()
204 #
205 # class Model(models.Model):
206 # foreign = models.ForeignKeyField(Target)
207 # m2m = models.ManyToManyField(Target)
208
209 # rel_options.object_name == "Target"
210 rel_meta = self.remote_field.model._model_meta
211 rel_options = self.remote_field.model.model_options
212 rel_query_name = self.related_query_name() # i. e. "model"
213 # i.e. "package_label.Model.field".
214 field_name = f"{self.model.model_options.label}.{self.name}"
215
216 # Check clashes between reverse query name of `field`
217 # and any other field name.
218 potential_clashes = rel_meta.fields + rel_meta.many_to_many
219 for clash_field in potential_clashes:
220 # i.e. "package_label.Target.model_set".
221 clash_name = f"{rel_options.label}.{clash_field.name}"
222 if clash_field.name == rel_query_name:
223 errors.append(
224 PreflightResult(
225 fix=(
226 f"Reverse query name for '{field_name}' clashes with field name '{clash_name}'. "
227 f"Rename field '{clash_name}' or use a different related_query_name."
228 ),
229 obj=self,
230 id="fields.related_accessor_clash_manager",
231 )
232 )
233
234 return errors
235
236 def db_type(self) -> str | None:
237 # By default related field will not have a column as it relates to
238 # columns from another table.
239 return None
240
241 def contribute_to_class(self, cls: type[Model], name: str) -> None:
242 super().contribute_to_class(cls, name)
243
244 self.meta = cls._model_meta
245
246 if self.remote_field.related_query_name:
247 related_query_name = self.remote_field.related_query_name % {
248 "class": cls.__name__.lower(),
249 "package_label": cls.model_options.package_label.lower(),
250 }
251 self.remote_field.related_query_name = related_query_name
252
253 def resolve_related_class(
254 model: type[Model], related: type[Model], field: RelatedField
255 ) -> None:
256 field.remote_field.model = related
257 field.do_related_class(related, model)
258
259 lazy_related_operation(
260 resolve_related_class,
261 cls,
262 self.remote_field.model,
263 field=self,
264 )
265
266 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
267 name, path, args, kwargs = super().deconstruct()
268 if self._limit_choices_to:
269 kwargs["limit_choices_to"] = self._limit_choices_to
270 if self._related_query_name is not None:
271 kwargs["related_query_name"] = self._related_query_name
272 return name, path, args, kwargs
273
274 def set_attributes_from_rel(self) -> None:
275 self.name = self.name or (
276 self.remote_field.model.model_options.model_name + "_" + "id"
277 )
278 self.remote_field.set_field_name()
279
280 def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
281 self.set_attributes_from_rel()
282
283 def get_limit_choices_to(self) -> Any:
284 """
285 Return ``limit_choices_to`` for this model field.
286
287 If it is a callable, it will be invoked and the result will be
288 returned.
289 """
290 if callable(self.remote_field.limit_choices_to):
291 return self.remote_field.limit_choices_to() # type: ignore[call-top-callable]
292 return self.remote_field.limit_choices_to
293
294 def related_query_name(self) -> str:
295 """
296 Define the name that can be used to identify this related object in a
297 table-spanning query.
298 """
299 return (
300 self.remote_field.related_query_name or self.model.model_options.model_name
301 )
302
303 @property
304 def target_field(self) -> Field:
305 """
306 When filtering against this relation, return the field on the remote
307 model against which the filtering should happen.
308 """
309 target_fields = self.path_infos[-1].target_fields
310 if len(target_fields) > 1:
311 raise FieldError(
312 "The relation has multiple target fields, but only single target field "
313 "was asked for"
314 )
315 return target_fields[0]
316
317 def get_cache_name(self) -> str:
318 assert self.name is not None, "Field name must be set"
319 return self.name
320
321
322class ForeignKeyField(RelatedField):
323 """
324 Provide a many-to-one relation by adding a column to the local model
325 to hold the remote value.
326
327 ForeignKeyField targets the primary key (id) of the remote model.
328 """
329
330 empty_strings_allowed = False
331 default_error_messages = {
332 "invalid": "%(model)s instance with %(field)s %(value)r does not exist."
333 }
334 description = "Foreign Key (type determined by related field)"
335
336 def __init__(
337 self,
338 to: type[Model] | str,
339 on_delete: Any,
340 related_query_name: str | None = None,
341 limit_choices_to: Any = None,
342 db_index: bool = True,
343 db_constraint: bool = True,
344 **kwargs: Any,
345 ):
346 if not isinstance(to, str):
347 try:
348 to.model_options.model_name
349 except AttributeError:
350 raise TypeError(
351 f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ForeignKeyField must be "
352 f"either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
353 )
354 if not callable(on_delete):
355 raise TypeError("on_delete must be callable.")
356
357 self.remote_field = ForeignKeyRel(
358 self,
359 to,
360 related_query_name=related_query_name,
361 limit_choices_to=limit_choices_to,
362 on_delete=on_delete,
363 )
364
365 super().__init__(
366 related_query_name=related_query_name,
367 limit_choices_to=limit_choices_to,
368 **kwargs,
369 )
370 self.db_index = db_index
371 self.db_constraint = db_constraint
372
373 def __copy__(self) -> ForeignKeyField:
374 obj = super().__copy__()
375 # Remove any cached PathInfo values.
376 obj.__dict__.pop("path_infos", None)
377 obj.__dict__.pop("reverse_path_infos", None)
378 return obj
379
380 def __set__(self, instance: Any, value: Any) -> None:
381 """
382 Override Field's __set__ to clear cached related object when FK value changes.
383
384 This ensures that when you change obj.user_id, the cached obj.user is invalidated.
385 """
386 # Check if value is changing and clear cache if needed
387 if (
388 hasattr(self, "attname")
389 and instance.__dict__.get(self.attname) != value
390 and self.is_cached(instance)
391 ):
392 self.delete_cached_value(instance)
393
394 # Call parent's __set__ to do the actual assignment
395 super().__set__(instance, value)
396
397 @cached_property
398 def related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
399 return self.resolve_related_fields()
400
401 @cached_property
402 def reverse_related_fields(self) -> list[tuple[Field, Field]]:
403 return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
404
405 @cached_property
406 def local_related_fields(self) -> tuple[Field, ...]:
407 return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
408
409 @cached_property
410 def foreign_related_fields(self) -> tuple[Field, ...]:
411 return tuple(
412 rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
413 )
414
415 def get_forward_related_filter(self, obj: Model) -> dict[str, Any]:
416 """
417 Return the keyword arguments that when supplied to
418 self.model.object.filter(), would select all instances related through
419 this field to the remote obj. This is used to build the querysets
420 returned by related descriptors. obj is an instance of
421 self.related_field.model.
422 """
423 return {
424 f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
425 for _, rh_field in self.related_fields
426 }
427
428 def get_reverse_related_filter(self, obj: Model) -> Q:
429 """
430 Complement to get_forward_related_filter(). Return the keyword
431 arguments that when passed to self.related_field.model.object.filter()
432 select all instances of self.related_field.model related through
433 this field to obj. obj is an instance of self.model.
434 """
435 return Q.create(
436 [
437 (rh_field.attname, getattr(obj, lh_field.attname))
438 for lh_field, rh_field in self.related_fields
439 ]
440 )
441
442 def get_local_related_value(self, instance: Model) -> tuple[Any, ...]:
443 # Always returns the value of the single local field
444 field = self.local_related_fields[0]
445 if field.primary_key:
446 return (instance.id,)
447 return (getattr(instance, field.attname),)
448
449 def get_foreign_related_value(self, instance: Model) -> tuple[Any, ...]:
450 # Always returns the id of the foreign instance
451 return (instance.id,)
452
453 def get_joining_columns(
454 self, reverse_join: bool = False
455 ) -> tuple[tuple[str, str], ...]:
456 # Always returns a single column pair
457 if reverse_join:
458 from_field, to_field = self.related_fields[0]
459 return ((to_field.column, from_field.column),)
460 else:
461 from_field, to_field = self.related_fields[0]
462 return ((from_field.column, to_field.column),)
463
464 def get_reverse_joining_columns(self) -> tuple[tuple[str, str], ...]:
465 return self.get_joining_columns(reverse_join=True)
466
467 def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
468 """Get path from this field to the related model."""
469 meta = self.remote_field.model._model_meta
470 from_meta = self.model._model_meta
471 return [
472 PathInfo(
473 from_meta=from_meta,
474 to_meta=meta,
475 target_fields=self.foreign_related_fields,
476 join_field=self,
477 m2m=False,
478 direct=True,
479 filtered_relation=filtered_relation,
480 )
481 ]
482
483 @cached_property
484 def path_infos(self) -> list[PathInfo]:
485 return self.get_path_info()
486
487 def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
488 """Get path from the related model to this field's model."""
489 meta = self.model._model_meta
490 from_meta = self.remote_field.model._model_meta
491 return [
492 PathInfo(
493 from_meta=from_meta,
494 to_meta=meta,
495 target_fields=(meta.get_forward_field("id"),),
496 join_field=self.remote_field,
497 m2m=not self.primary_key,
498 direct=False,
499 filtered_relation=filtered_relation,
500 )
501 ]
502
503 @cached_property
504 def reverse_path_infos(self) -> list[PathInfo]:
505 return self.get_reverse_path_info()
506
507 def contribute_to_class(self, cls: type[Model], name: str) -> None:
508 super().contribute_to_class(cls, name)
509 setattr(cls, name, ForwardForeignKeyDescriptor(self))
510
511 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
512 return [
513 *super().preflight(**kwargs),
514 *self._check_on_delete(),
515 ]
516
517 def _check_on_delete(self) -> list[PreflightResult]:
518 on_delete = getattr(self.remote_field, "on_delete", None)
519 if on_delete == SET_NULL and not self.allow_null:
520 return [
521 PreflightResult(
522 fix=(
523 "Field specifies on_delete=SET_NULL, but cannot be null. "
524 "Set allow_null=True argument on the field, or change the on_delete rule."
525 ),
526 obj=self,
527 id="fields.foreign_key_null_constraint_violation",
528 )
529 ]
530 elif on_delete == SET_DEFAULT and not self.has_default():
531 return [
532 PreflightResult(
533 fix=(
534 "Field specifies on_delete=SET_DEFAULT, but has no default value. "
535 "Set a default value, or change the on_delete rule."
536 ),
537 obj=self,
538 id="fields.foreign_key_set_default_no_default",
539 )
540 ]
541 else:
542 return []
543
544 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
545 name, path, args, kwargs = super().deconstruct()
546 kwargs["on_delete"] = self.remote_field.on_delete
547
548 if isinstance(self.remote_field.model, SettingsReference):
549 kwargs["to"] = self.remote_field.model
550 elif isinstance(self.remote_field.model, str):
551 if "." in self.remote_field.model:
552 package_label, model_name = self.remote_field.model.split(".")
553 kwargs["to"] = f"{package_label}.{model_name.lower()}"
554 else:
555 kwargs["to"] = self.remote_field.model.lower()
556 else:
557 kwargs["to"] = self.remote_field.model.model_options.label_lower
558
559 if self.db_index is not True:
560 kwargs["db_index"] = self.db_index
561
562 if self.db_constraint is not True:
563 kwargs["db_constraint"] = self.db_constraint
564
565 return name, path, args, kwargs
566
567 def to_python(self, value: Any) -> Any:
568 return self.target_field.to_python(value)
569
570 @property
571 def target_field(self) -> Field:
572 return self.foreign_related_fields[0]
573
574 def validate(self, value: Any, model_instance: Model) -> None:
575 super().validate(value, model_instance)
576 if value is None:
577 return None
578
579 field_name = self.remote_field.field_name
580 if field_name is None:
581 raise ValueError("remote_field.field_name cannot be None")
582 qs = self.remote_field.model._model_meta.base_queryset.filter(
583 **{field_name: value}
584 )
585 qs = qs.complex_filter(self.get_limit_choices_to())
586 if not qs.exists():
587 raise exceptions.ValidationError(
588 self.error_messages["invalid"],
589 code="invalid",
590 params={
591 "model": self.remote_field.model.model_options.model_name,
592 "id": value,
593 "field": self.remote_field.field_name,
594 "value": value,
595 },
596 )
597
598 def resolve_related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
599 if isinstance(self.remote_field.model, str):
600 raise ValueError(
601 f"Related model {self.remote_field.model!r} cannot be resolved"
602 )
603 from_field = self
604 to_field = self.remote_field.model._model_meta.get_forward_field("id")
605 related_fields: list[tuple[ForeignKeyField, Field]] = [(from_field, to_field)]
606
607 for from_field, to_field in related_fields:
608 if to_field and to_field.model != self.remote_field.model:
609 raise FieldError(
610 f"'{self.model.model_options.label}.{self.name}' refers to field '{to_field.name}' which is not local to model "
611 f"'{self.remote_field.model.model_options.label}'."
612 )
613 return related_fields
614
615 def get_attname(self) -> str:
616 return f"{self.name}_id"
617
618 def get_default(self) -> Any:
619 """Return the to_field if the default value is an object."""
620 field_default = super().get_default()
621 if isinstance(field_default, self.remote_field.model):
622 return getattr(field_default, self.target_field.attname)
623 return field_default
624
625 def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
626 if value is None or (
627 value == "" and not self.target_field.empty_strings_allowed
628 ):
629 return None
630 else:
631 return self.target_field.get_db_prep_save(value, connection=connection)
632
633 def get_db_prep_value(
634 self, value: Any, connection: DatabaseConnection, prepared: bool = False
635 ) -> Any:
636 return self.target_field.get_db_prep_value(value, connection, prepared)
637
638 def get_prep_value(self, value: Any) -> Any:
639 return self.target_field.get_prep_value(value)
640
641 def db_check(self) -> None:
642 return None
643
644 def db_type(self) -> str | None:
645 return self.target_field.rel_db_type()
646
647 def cast_db_type(self) -> str | None:
648 return self.target_field.cast_db_type()
649
650 def db_parameters(self) -> DbParameters:
651 return {
652 "type": self.db_type(),
653 "check": self.db_check(),
654 }
655
656 def get_col(self, alias: str | None, output_field: Field | None = None) -> Any:
657 if output_field is None:
658 output_field = self.target_field
659 while isinstance(output_field, ForeignKeyField):
660 output_field = output_field.target_field
661 if output_field is self:
662 raise ValueError("Cannot resolve output_field.")
663 return super().get_col(alias, output_field)
664
665
666# Register lookups for ForeignKey
667ForeignKeyField.register_lookup(RelatedIn)
668ForeignKeyField.register_lookup(RelatedExact)
669ForeignKeyField.register_lookup(RelatedLessThan)
670ForeignKeyField.register_lookup(RelatedGreaterThan)
671ForeignKeyField.register_lookup(RelatedGreaterThanOrEqual)
672ForeignKeyField.register_lookup(RelatedLessThanOrEqual)
673ForeignKeyField.register_lookup(RelatedIsNull)
674
675
676class ManyToManyField(RelatedField):
677 """
678 Provide a many-to-many relation by using an intermediary model that
679 holds two ForeignKeyField fields pointed at the two sides of the relation.
680
681 Unless a ``through`` model was provided, ManyToManyField will use the
682 create_many_to_many_intermediary_model factory to automatically generate
683 the intermediary model.
684 """
685
686 # ManyToManyField uses ManyToManyRel which has through/through_fields
687 remote_field: ManyToManyRel
688
689 description = "Many-to-many relationship"
690
691 def __init__(
692 self,
693 to: type[Model] | str,
694 *,
695 through: type[Model] | str,
696 through_fields: tuple[str, str] | None = None,
697 related_query_name: str | None = None,
698 limit_choices_to: Any = None,
699 symmetrical: bool | None = None,
700 **kwargs: Any,
701 ):
702 if not isinstance(to, str):
703 try:
704 to._model_meta
705 except AttributeError:
706 raise TypeError(
707 f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ManyToManyField "
708 f"must be either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
709 )
710
711 if symmetrical is None:
712 symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
713
714 if not through:
715 raise ValueError("ManyToManyField must have a 'through' argument.")
716
717 self.remote_field = ManyToManyRel(
718 self,
719 to,
720 related_query_name=related_query_name,
721 limit_choices_to=limit_choices_to,
722 symmetrical=symmetrical,
723 through=through,
724 through_fields=through_fields,
725 )
726 self.has_null_arg = "allow_null" in kwargs
727
728 super().__init__(
729 related_query_name=related_query_name,
730 limit_choices_to=limit_choices_to,
731 **kwargs,
732 )
733
734 def preflight(self, **kwargs: Any) -> list[PreflightResult]:
735 return [
736 *super().preflight(**kwargs),
737 *self._check_relationship_model(**kwargs),
738 *self._check_ignored_options(**kwargs),
739 *self._check_table_uniqueness(**kwargs),
740 ]
741
742 def _check_ignored_options(self, **kwargs: Any) -> list[PreflightResult]:
743 warnings: list[PreflightResult] = []
744
745 if self.has_null_arg:
746 warnings.append(
747 PreflightResult(
748 fix="The 'null' option has no effect on ManyToManyField. Remove the 'null' argument.",
749 obj=self,
750 id="fields.m2m_null_has_no_effect",
751 warning=True,
752 )
753 )
754
755 if self._validators:
756 warnings.append(
757 PreflightResult(
758 fix="ManyToManyField does not support validators. Remove validators from this field.",
759 obj=self,
760 id="fields.m2m_validators_not_supported",
761 warning=True,
762 )
763 )
764
765 return warnings
766
767 def _check_relationship_model(
768 self, from_model: type[Model] | None = None, **kwargs: Any
769 ) -> list[PreflightResult]:
770 if hasattr(self.remote_field.through, "_model_meta"):
771 qualified_model_name = f"{self.remote_field.through.model_options.package_label}.{self.remote_field.through.__name__}"
772 else:
773 qualified_model_name = self.remote_field.through
774
775 errors = []
776
777 if self.remote_field.through not in self.meta.models_registry.get_models():
778 # The relationship model is not installed.
779 errors.append(
780 PreflightResult(
781 fix=(
782 "Field specifies a many-to-many relation through model "
783 f"'{qualified_model_name}', which has not been installed. "
784 "Ensure the through model is properly defined and installed."
785 ),
786 obj=self,
787 id="fields.m2m_through_model_not_installed",
788 )
789 )
790
791 else:
792 assert from_model is not None, (
793 "ManyToManyField with intermediate "
794 "tables cannot be checked if you don't pass the model "
795 "where the field is attached to."
796 )
797 # Set some useful local variables
798 to_model = resolve_relation(from_model, self.remote_field.model)
799 from_model_name = from_model.model_options.object_name
800 if isinstance(to_model, str):
801 to_model_name = to_model
802 else:
803 to_model_name = to_model.model_options.object_name
804 relationship_model_name = (
805 self.remote_field.through.model_options.object_name
806 )
807 self_referential = from_model == to_model
808 # Count foreign keys in intermediate model
809 if self_referential:
810 seen_self = sum(
811 from_model == field.remote_field.model
812 for field in self.remote_field.through._model_meta.fields
813 if isinstance(field, RelatedField)
814 )
815
816 if seen_self > 2 and not self.remote_field.through_fields:
817 errors.append(
818 PreflightResult(
819 fix=(
820 "The model is used as an intermediate model by "
821 f"'{self}', but it has more than two foreign keys "
822 f"to '{from_model_name}', which is ambiguous. "
823 "Use through_fields to specify which two foreign keys "
824 "Plain should use."
825 ),
826 obj=self.remote_field.through,
827 id="fields.m2m_through_model_ambiguous_fks",
828 )
829 )
830
831 else:
832 # Count foreign keys in relationship model
833 seen_from = sum(
834 from_model == field.remote_field.model
835 for field in self.remote_field.through._model_meta.fields
836 if isinstance(field, RelatedField)
837 )
838 seen_to = sum(
839 to_model == field.remote_field.model
840 for field in self.remote_field.through._model_meta.fields
841 if isinstance(field, RelatedField)
842 )
843
844 if seen_from > 1 and not self.remote_field.through_fields:
845 errors.append(
846 PreflightResult(
847 fix=(
848 "The model is used as an intermediate model by "
849 f"'{self}', but it has more than one foreign key "
850 f"from '{from_model_name}', which is ambiguous. You must specify "
851 "which foreign key Plain should use via the "
852 "through_fields keyword argument. "
853 "If you want to create a recursive relationship, "
854 f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
855 ),
856 obj=self,
857 id="fields.m2m_through_model_invalid_recursive_from",
858 )
859 )
860
861 if seen_to > 1 and not self.remote_field.through_fields:
862 errors.append(
863 PreflightResult(
864 fix=(
865 "The model is used as an intermediate model by "
866 f"'{self}', but it has more than one foreign key "
867 f"to '{to_model_name}', which is ambiguous. You must specify "
868 "which foreign key Plain should use via the "
869 "through_fields keyword argument. "
870 "If you want to create a recursive relationship, "
871 f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
872 ),
873 obj=self,
874 id="fields.m2m_through_model_invalid_recursive_to",
875 )
876 )
877
878 if seen_from == 0 or seen_to == 0:
879 errors.append(
880 PreflightResult(
881 fix=(
882 "The model is used as an intermediate model by "
883 f"'{self}', but it does not have a foreign key to '{from_model_name}' or '{to_model_name}'. "
884 "Add the required foreign keys to the through model."
885 ),
886 obj=self.remote_field.through,
887 id="fields.m2m_through_model_missing_fk",
888 )
889 )
890
891 # Validate `through_fields`.
892 if self.remote_field.through_fields is not None:
893 # Validate that we're given an iterable of at least two items
894 # and that none of them is "falsy".
895 if not (
896 len(self.remote_field.through_fields) >= 2
897 and self.remote_field.through_fields[0]
898 and self.remote_field.through_fields[1]
899 ):
900 errors.append(
901 PreflightResult(
902 fix=(
903 "Field specifies 'through_fields' but does not provide "
904 "the names of the two link fields that should be used "
905 f"for the relation through model '{qualified_model_name}'. "
906 "Make sure you specify 'through_fields' as "
907 "through_fields=('field1', 'field2')."
908 ),
909 obj=self,
910 id="fields.m2m_through_fields_wrong_length",
911 )
912 )
913
914 # Validate the given through fields -- they should be actual
915 # fields on the through model, and also be foreign keys to the
916 # expected models.
917 else:
918 assert from_model is not None, (
919 "ManyToManyField with intermediate "
920 "tables cannot be checked if you don't pass the model "
921 "where the field is attached to."
922 )
923
924 source, through, target = (
925 from_model,
926 self.remote_field.through,
927 self.remote_field.model,
928 )
929 source_field_name, target_field_name = self.remote_field.through_fields[
930 :2
931 ]
932
933 for field_name, related_model in (
934 (source_field_name, source),
935 (target_field_name, target),
936 ):
937 possible_field_names = []
938 for f in through._model_meta.fields:
939 if (
940 hasattr(f, "remote_field")
941 and getattr(f.remote_field, "model", None) == related_model
942 ):
943 possible_field_names.append(f.name)
944 if possible_field_names:
945 fix = (
946 "Did you mean one of the following foreign keys to '{}': "
947 "{}?".format(
948 related_model.model_options.object_name,
949 ", ".join(possible_field_names),
950 )
951 )
952 else:
953 fix = ""
954
955 try:
956 field = through._model_meta.get_forward_field(field_name)
957 except FieldDoesNotExist:
958 errors.append(
959 PreflightResult(
960 fix=f"The intermediary model '{qualified_model_name}' has no field '{field_name}'. {fix}",
961 obj=self,
962 id="fields.m2m_through_field_not_found",
963 )
964 )
965 else:
966 if not (
967 isinstance(field, RelatedField)
968 and field.remote_field.model == related_model
969 ):
970 errors.append(
971 PreflightResult(
972 fix=f"'{through.model_options.object_name}.{field_name}' is not a foreign key to '{related_model.model_options.object_name}'. {fix}",
973 obj=self,
974 id="fields.m2m_through_field_not_fk_to_model",
975 )
976 )
977
978 return errors
979
980 def _check_table_uniqueness(self, **kwargs: Any) -> list[PreflightResult]:
981 if isinstance(self.remote_field.through, str):
982 return []
983 registered_tables = {
984 model.model_options.db_table: model
985 for model in self.meta.models_registry.get_models()
986 if model != self.remote_field.through
987 }
988 m2m_db_table = self.m2m_db_table()
989 model = registered_tables.get(m2m_db_table)
990 # Check if there's already a m2m field using the same through model.
991 if model and model != self.remote_field.through:
992 clashing_obj = model.model_options.label
993 return [
994 PreflightResult(
995 fix=(
996 f"The field's intermediary table '{m2m_db_table}' clashes with the "
997 f"table name of '{clashing_obj}'. "
998 "Change the through model's db_table or use a different model."
999 ),
1000 obj=self,
1001 id="fields.m2m_table_name_clash",
1002 )
1003 ]
1004 return []
1005
1006 def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1007 name, path, args, kwargs = super().deconstruct()
1008
1009 if self.remote_field.db_constraint is not True:
1010 kwargs["db_constraint"] = self.remote_field.db_constraint
1011
1012 # Lowercase model names as they should be treated as case-insensitive.
1013 if isinstance(self.remote_field.model, str):
1014 if "." in self.remote_field.model:
1015 package_label, model_name = self.remote_field.model.split(".")
1016 kwargs["to"] = f"{package_label}.{model_name.lower()}"
1017 else:
1018 kwargs["to"] = self.remote_field.model.lower()
1019 else:
1020 kwargs["to"] = self.remote_field.model.model_options.label_lower
1021
1022 if isinstance(self.remote_field.through, str):
1023 kwargs["through"] = self.remote_field.through
1024 else:
1025 kwargs["through"] = self.remote_field.through.model_options.label
1026
1027 return name, path, args, kwargs
1028
1029 def _get_path_info(
1030 self, direct: bool = False, filtered_relation: Any = None
1031 ) -> list[PathInfo]:
1032 """Called by both direct and indirect m2m traversal."""
1033 int_model = self.remote_field.through
1034 # M2M through model fields are always ForeignKey
1035 linkfield1 = cast(
1036 ForeignKeyField,
1037 int_model._model_meta.get_forward_field(self.m2m_field_name()),
1038 )
1039 linkfield2 = cast(
1040 ForeignKeyField,
1041 int_model._model_meta.get_forward_field(self.m2m_reverse_field_name()),
1042 )
1043 if direct:
1044 join1infos = linkfield1.reverse_path_infos
1045 if filtered_relation:
1046 join2infos = linkfield2.get_path_info(filtered_relation)
1047 else:
1048 join2infos = linkfield2.path_infos
1049 else:
1050 join1infos = linkfield2.reverse_path_infos
1051 if filtered_relation:
1052 join2infos = linkfield1.get_path_info(filtered_relation)
1053 else:
1054 join2infos = linkfield1.path_infos
1055
1056 return [*join1infos, *join2infos]
1057
1058 def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1059 return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1060
1061 @cached_property
1062 def path_infos(self) -> list[PathInfo]:
1063 return self.get_path_info()
1064
1065 def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1066 return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1067
1068 @cached_property
1069 def reverse_path_infos(self) -> list[PathInfo]:
1070 return self.get_reverse_path_info()
1071
1072 def _get_m2m_db_table(self) -> str:
1073 """
1074 Function that can be curried to provide the m2m table name for this
1075 relation.
1076 """
1077 return self.remote_field.through.model_options.db_table
1078
1079 def _get_m2m_attr(self, related: Any, attr: str) -> Any:
1080 """
1081 Function that can be curried to provide the source accessor or DB
1082 column name for the m2m table.
1083 """
1084 cache_attr = f"_m2m_{attr}_cache"
1085 if hasattr(self, cache_attr):
1086 return getattr(self, cache_attr)
1087 if self.remote_field.through_fields is not None:
1088 link_field_name: str | None = self.remote_field.through_fields[0]
1089 else:
1090 link_field_name = None
1091 for f in self.remote_field.through._model_meta.fields:
1092 if (
1093 isinstance(f, RelatedField)
1094 and f.remote_field.model == related.related_model
1095 and (link_field_name is None or link_field_name == f.name)
1096 ):
1097 setattr(self, cache_attr, getattr(f, attr))
1098 return getattr(self, cache_attr)
1099 return None
1100
1101 def _get_m2m_reverse_attr(self, related: Any, attr: str) -> Any:
1102 """
1103 Function that can be curried to provide the related accessor or DB
1104 column name for the m2m table.
1105 """
1106 cache_attr = f"_m2m_reverse_{attr}_cache"
1107 if hasattr(self, cache_attr):
1108 return getattr(self, cache_attr)
1109 found = False
1110 if self.remote_field.through_fields is not None:
1111 link_field_name: str | None = self.remote_field.through_fields[1]
1112 else:
1113 link_field_name = None
1114 for f in self.remote_field.through._model_meta.fields:
1115 if isinstance(f, RelatedField) and f.remote_field.model == related.model:
1116 if link_field_name is None and related.related_model == related.model:
1117 # If this is an m2m-intermediate to self,
1118 # the first foreign key you find will be
1119 # the source column. Keep searching for
1120 # the second foreign key.
1121 if found:
1122 setattr(self, cache_attr, getattr(f, attr))
1123 break
1124 else:
1125 found = True
1126 elif link_field_name is None or link_field_name == f.name:
1127 setattr(self, cache_attr, getattr(f, attr))
1128 break
1129 return getattr(self, cache_attr)
1130
1131 def contribute_to_class(self, cls: type[Model], name: str) -> None:
1132 super().contribute_to_class(cls, name)
1133
1134 def resolve_through_model(
1135 _: Any, model: type[Model], field: ManyToManyField
1136 ) -> None:
1137 field.remote_field.through = model
1138
1139 lazy_related_operation(
1140 resolve_through_model,
1141 cls,
1142 self.remote_field.through,
1143 field=self,
1144 )
1145
1146 # Add the descriptor for the m2m relation.
1147 setattr(cls, self.name, ForwardManyToManyDescriptor(self.remote_field)) # type: ignore[arg-type]
1148
1149 # Set up the accessor for the m2m table name for the relation.
1150 self.m2m_db_table = self._get_m2m_db_table
1151
1152 def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
1153 """Set up M2M metadata accessors for the through table."""
1154 super().do_related_class(other, cls)
1155
1156 # Set up the accessors for the column names on the m2m table.
1157 # These are used during query construction and schema operations.
1158 related = self.remote_field
1159 self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1160 self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1161
1162 self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1163 self.m2m_reverse_field_name = partial(
1164 self._get_m2m_reverse_attr, related, "name"
1165 )
1166
1167 get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1168 self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1169 get_m2m_reverse_rel = partial(
1170 self._get_m2m_reverse_attr, related, "remote_field"
1171 )
1172 self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1173
1174 def set_attributes_from_rel(self) -> None:
1175 pass
1176
1177 def value_from_object(self, obj: Model) -> list[Any]:
1178 return [] if obj.id is None else list(getattr(obj, self.attname).all())
1179
1180 def save_form_data(self, instance: Model, data: Any) -> None:
1181 getattr(instance, self.attname).set(data)
1182
1183 def db_check(self) -> None:
1184 return None
1185
1186 def db_type(self) -> None:
1187 # A ManyToManyField is not represented by a single column,
1188 # so return None.
1189 return None
1190
1191 def db_parameters(self) -> DbParameters:
1192 return {"type": None, "check": None}