v0.146.0
   1from __future__ import annotations
   2
   3import copy
   4import operator
   5from collections.abc import Callable, Sequence
   6from functools import cached_property, partial
   7from typing import TYPE_CHECKING, Any, Self, cast
   8
   9from plain import exceptions
  10from plain.postgres.constants import LOOKUP_SEP
  11from plain.postgres.deletion import NO_ACTION, SET_NULL, OnDelete
  12from plain.postgres.exceptions import FieldDoesNotExist, FieldError
  13from plain.postgres.query_utils import PathInfo, Q
  14from plain.postgres.utils import make_model_tuple
  15from plain.preflight import PreflightResult
  16
  17from ..registry import models_registry
  18from . import BLANK_CHOICE_DASH, Field
  19from .base import ColumnField
  20from .mixins import FieldCacheMixin
  21from .related_descriptors import (
  22    ForwardForeignKeyDescriptor,
  23    ForwardManyToManyDescriptor,
  24)
  25from .related_lookups import (
  26    RelatedExact,
  27    RelatedGreaterThan,
  28    RelatedGreaterThanOrEqual,
  29    RelatedIn,
  30    RelatedIsNull,
  31    RelatedLessThan,
  32    RelatedLessThanOrEqual,
  33)
  34from .reverse_related import ForeignKeyRel, ManyToManyRel
  35
  36if TYPE_CHECKING:
  37    from plain.postgres.base import Model
  38    from plain.postgres.connection import DatabaseConnection
  39    from plain.postgres.fields.reverse_related import ForeignObjectRel
  40
  41RECURSIVE_RELATIONSHIP_CONSTANT = "self"
  42
  43
  44def resolve_relation(
  45    scope_model: type[Model], relation: type[Model] | str
  46) -> type[Model] | str:
  47    """
  48    Transform relation into a model or fully-qualified model string of the form
  49    "package_label.ModelName", relative to scope_model.
  50
  51    The relation argument can be:
  52      * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
  53        the model argument will be returned.
  54      * A bare model name without an package_label, in which case scope_model's
  55        package_label will be prepended.
  56      * An "package_label.ModelName" string.
  57      * A model class, which will be returned unchanged.
  58    """
  59    # Check for recursive relations
  60    if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
  61        relation = scope_model
  62
  63    # Look for an "app.Model" relation
  64    if isinstance(relation, str):
  65        if "." not in relation:
  66            relation = f"{scope_model.model_options.package_label}.{relation}"
  67
  68    return relation
  69
  70
  71def lazy_related_operation(
  72    function: Any, model: type[Model], *related_models: type[Model] | str, **kwargs: Any
  73) -> None:
  74    """
  75    Schedule `function` to be called once `model` and all `related_models`
  76    have been imported and registered with the app registry. `function` will
  77    be called with the newly-loaded model classes as its positional arguments,
  78    plus any optional keyword arguments.
  79
  80    The `model` argument must be a model class. Each subsequent positional
  81    argument is another model, or a reference to another model - see
  82    `resolve_relation()` for the various forms these may take. Any relative
  83    references will be resolved relative to `model`.
  84
  85    This is a convenience wrapper for `Packages.lazy_model_operation` - the app
  86    registry model used is the one found in `model._model_meta.models_registry`.
  87    """
  88    models = [model] + [resolve_relation(model, rel) for rel in related_models]
  89    model_keys = (make_model_tuple(m) for m in models)
  90    models_registry = model._model_meta.models_registry
  91    return models_registry.lazy_model_operation(
  92        partial(function, **kwargs), *model_keys
  93    )
  94
  95
  96class RelatedField(FieldCacheMixin, Field):
  97    """Base class that all relational fields inherit from."""
  98
  99    non_migration_attrs = (
 100        *Field.non_migration_attrs,
 101        "limit_choices_to",
 102        "related_query_name",
 103    )
 104
 105    # RelatedField always has a remote_field (never None)
 106    remote_field: ForeignObjectRel
 107    # path_infos is implemented as @cached_property in subclasses (ForeignKey, ManyToManyField)
 108    path_infos: list[PathInfo]
 109    # Set by ForeignKeyField / ManyToManyField in their __init__; declared
 110    # here so RelatedField methods (deconstruct, related_query_name) can
 111    # reference them without isinstance-narrowing.
 112    _related_query_name: str | None
 113    _limit_choices_to: Any
 114
 115    # No __init__: ForeignKeyField and ManyToManyField each set
 116    # _related_query_name, _limit_choices_to, and remote_field themselves.
 117
 118    def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
 119        # Handle remote_field deepcopy for RelatedFields
 120        obj = super().__deepcopy__(memodict)
 121        obj.remote_field = copy.copy(self.remote_field)
 122        if hasattr(self.remote_field, "field") and self.remote_field.field is self:
 123            obj.remote_field.field = obj  # ty: ignore[invalid-assignment]
 124        return obj
 125
 126    @cached_property
 127    def related_model(self) -> type[Model]:
 128        # Can't cache this property until all the models are loaded.
 129        models_registry.check_ready()
 130        return self.remote_field.model
 131
 132    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 133        return [
 134            *super().preflight(**kwargs),
 135            *self._check_related_query_name_is_valid(),
 136            *self._check_relation_model_exists(),
 137            *self._check_clashes(),
 138        ]
 139
 140    def _check_related_query_name_is_valid(self) -> list[PreflightResult]:
 141        # Always validate related_query_name since it's still used for ORM queries
 142        # (e.g., User.query.filter(articles__title="..."))
 143        rel_query_name = self.related_query_name()
 144        errors: list[PreflightResult] = []
 145        if rel_query_name.endswith("_"):
 146            errors.append(
 147                PreflightResult(
 148                    fix=(
 149                        f"Reverse query name '{rel_query_name}' must not end with an underscore. "
 150                        "Use a different related_query_name."
 151                    ),
 152                    obj=self,
 153                    id="fields.related_field_accessor_clash",
 154                )
 155            )
 156        if LOOKUP_SEP in rel_query_name:
 157            errors.append(
 158                PreflightResult(
 159                    fix=(
 160                        f"Reverse query name '{rel_query_name}' must not contain '{LOOKUP_SEP}'. "
 161                        "Use a different related_query_name."
 162                    ),
 163                    obj=self,
 164                    id="fields.related_field_query_name_clash",
 165                )
 166            )
 167        return errors
 168
 169    def _check_relation_model_exists(self) -> list[PreflightResult]:
 170        rel_is_missing = (
 171            self.remote_field.model not in self.meta.models_registry.get_models()
 172        )
 173        rel_is_string = isinstance(self.remote_field.model, str)
 174        model_name = (
 175            self.remote_field.model
 176            if rel_is_string
 177            else self.remote_field.model.model_options.object_name
 178        )
 179        if rel_is_missing and rel_is_string:
 180            return [
 181                PreflightResult(
 182                    fix=(
 183                        f"Field defines a relation with model '{model_name}', which is not "
 184                        "installed. Ensure the model's package is registered."
 185                    ),
 186                    obj=self,
 187                    id="fields.related_model_not_installed",
 188                )
 189            ]
 190        return []
 191
 192    def _check_clashes(self) -> list[PreflightResult]:
 193        """Check accessor and reverse query name clashes."""
 194        from plain.postgres.base import ModelBase
 195
 196        errors: list[PreflightResult] = []
 197
 198        # f.remote_field.model may be a string instead of a model. Skip if
 199        # model name is not resolved.
 200        if not isinstance(self.remote_field.model, ModelBase):
 201            return []
 202
 203        # Consider that we are checking field `Model.foreign` and the models
 204        # are:
 205        #
 206        #     class Target(models.Model):
 207        #         model = models.IntegerField()
 208        #         model_set = models.IntegerField()
 209        #
 210        #     class Model(models.Model):
 211        #         foreign = models.ForeignKeyField(Target)
 212        #         m2m = models.ManyToManyField(Target)
 213
 214        # rel_options.object_name == "Target"
 215        rel_meta = self.remote_field.model._model_meta
 216        rel_options = self.remote_field.model.model_options
 217        rel_query_name = self.related_query_name()  # i. e. "model"
 218        # i.e. "package_label.Model.field".
 219        field_name = f"{self.model.model_options.label}.{self.name}"
 220
 221        # Check clashes between reverse query name of `field`
 222        # and any other field name.
 223        potential_clashes = rel_meta.fields + rel_meta.many_to_many
 224        for clash_field in potential_clashes:
 225            # i.e. "package_label.Target.model_set".
 226            clash_name = f"{rel_options.label}.{clash_field.name}"
 227            if clash_field.name == rel_query_name:
 228                errors.append(
 229                    PreflightResult(
 230                        fix=(
 231                            f"Reverse query name for '{field_name}' clashes with field name '{clash_name}'. "
 232                            f"Rename field '{clash_name}' or use a different related_query_name."
 233                        ),
 234                        obj=self,
 235                        id="fields.related_accessor_clash_manager",
 236                    )
 237                )
 238
 239        return errors
 240
 241    def db_type(self) -> str | None:
 242        # By default related field will not have a column as it relates to
 243        # columns from another table.
 244        return None
 245
 246    def unqualified_db_type(self) -> str | None:
 247        return self.rel_db_type()
 248
 249    def contribute_to_class(self, cls: type[Model], name: str) -> None:
 250        super().contribute_to_class(cls, name)
 251
 252        self.meta = cls._model_meta
 253
 254        if self.remote_field.related_query_name:
 255            related_query_name = self.remote_field.related_query_name % {
 256                "class": cls.__name__.lower(),
 257                "package_label": cls.model_options.package_label.lower(),
 258            }
 259            self.remote_field.related_query_name = related_query_name
 260
 261        def resolve_related_class(
 262            model: type[Model], related: type[Model], field: RelatedField
 263        ) -> None:
 264            field.remote_field.model = related
 265            field.do_related_class(related, model)
 266
 267        lazy_related_operation(
 268            resolve_related_class,
 269            cls,
 270            self.remote_field.model,
 271            field=self,
 272        )
 273
 274    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
 275        name, path, args, kwargs = super().deconstruct()
 276        if self._limit_choices_to:
 277            kwargs["limit_choices_to"] = self._limit_choices_to
 278        if self._related_query_name is not None:
 279            kwargs["related_query_name"] = self._related_query_name
 280        return name, path, args, kwargs
 281
 282    def set_attributes_from_rel(self) -> None:
 283        self.name = self.name or (
 284            self.remote_field.model.model_options.model_name + "_" + "id"
 285        )
 286        self.remote_field.set_field_name()
 287
 288    def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
 289        self.set_attributes_from_rel()
 290
 291    def get_limit_choices_to(self) -> Any:
 292        """
 293        Return ``limit_choices_to`` for this model field.
 294
 295        If it is a callable, it will be invoked and the result will be
 296        returned.
 297        """
 298        if callable(self.remote_field.limit_choices_to):
 299            return self.remote_field.limit_choices_to()  # ty: ignore[call-top-callable]
 300        return self.remote_field.limit_choices_to
 301
 302    def get_choices(
 303        self,
 304        include_blank: bool = True,
 305        blank_choice: list[tuple[str, str]] = BLANK_CHOICE_DASH,
 306        limit_choices_to: Any = None,
 307        ordering: tuple[str, ...] = (),
 308    ) -> list[tuple[Any, str]]:
 309        """Return choices from the related model, for use as <select> options."""
 310        rel_model = self.remote_field.model
 311        if rel_model is None:
 312            return blank_choice if include_blank else []
 313        limit_choices_to = limit_choices_to or self.get_limit_choices_to()
 314        get_related_field = getattr(self.remote_field, "get_related_field", None)
 315        related_field_name = (
 316            get_related_field().attname if get_related_field is not None else "id"
 317        )
 318        choice_func = operator.attrgetter(related_field_name)
 319        qs = rel_model.query.complex_filter(limit_choices_to)
 320        if ordering:
 321            qs = qs.order_by(*ordering)
 322        return (blank_choice if include_blank else []) + [
 323            (choice_func(x), str(x)) for x in qs
 324        ]
 325
 326    def related_query_name(self) -> str:
 327        """
 328        Define the name that can be used to identify this related object in a
 329        table-spanning query.
 330        """
 331        return (
 332            self.remote_field.related_query_name or self.model.model_options.model_name
 333        )
 334
 335    @property
 336    def target_field(self) -> Field:
 337        """
 338        When filtering against this relation, return the field on the remote
 339        model against which the filtering should happen.
 340        """
 341        target_fields = self.path_infos[-1].target_fields
 342        if len(target_fields) > 1:
 343            raise FieldError(
 344                "The relation has multiple target fields, but only single target field "
 345                "was asked for"
 346            )
 347        return target_fields[0]
 348
 349    def get_cache_name(self) -> str:
 350        assert self.name is not None, "Field name must be set"
 351        return self.name
 352
 353
 354class ForeignKeyField(ColumnField, RelatedField):
 355    # Narrow the base class's `ForeignObjectRel` annotation — a FK's remote_field
 356    # is always a ForeignKeyRel with a concrete on_delete action.
 357    remote_field: ForeignKeyRel
 358
 359    """
 360    Provide a many-to-one relation by adding a column to the local model
 361    to hold the remote value.
 362
 363    ForeignKeyField targets the primary key (id) of the remote model.
 364    """
 365
 366    non_migration_attrs = (
 367        *RelatedField.non_migration_attrs,
 368        *ColumnField.non_migration_attrs,
 369        "on_delete",
 370    )
 371
 372    empty_strings_allowed = False
 373
 374    def __init__(
 375        self,
 376        to: type[Model] | str,
 377        on_delete: OnDelete,
 378        related_query_name: str | None = None,
 379        limit_choices_to: Any = None,
 380        db_constraint: bool = True,
 381        *,
 382        required: bool = True,
 383        allow_null: bool = False,
 384        validators: Sequence[Callable[..., Any]] = (),
 385    ):
 386        # `default` and `choices` are intentionally not accepted: a hardcoded
 387        # FK id default is a portability/existence footgun, and the related
 388        # model itself already defines the valid set. Use `limit_choices_to`
 389        # to constrain the target rows.
 390        if not isinstance(to, str):
 391            try:
 392                to.model_options.model_name
 393            except AttributeError:
 394                raise TypeError(
 395                    f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ForeignKeyField must be "
 396                    f"either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
 397                )
 398        if not isinstance(on_delete, OnDelete):
 399            raise TypeError(
 400                "on_delete must be one of plain.postgres.CASCADE, SET_NULL, "
 401                f"RESTRICT, or NO_ACTION; got {on_delete!r}"
 402            )
 403
 404        super().__init__(
 405            required=required,
 406            allow_null=allow_null,
 407            validators=validators,
 408        )
 409        self._related_query_name = related_query_name
 410        self._limit_choices_to = limit_choices_to
 411        self.remote_field = ForeignKeyRel(
 412            field=self,
 413            to=to,
 414            on_delete=on_delete,
 415            related_query_name=related_query_name,
 416            limit_choices_to=limit_choices_to,
 417        )
 418        self.db_constraint = db_constraint
 419
 420    def __copy__(self) -> ForeignKeyField:
 421        obj = super().__copy__()
 422        # Remove any cached PathInfo values.
 423        obj.__dict__.pop("path_infos", None)
 424        obj.__dict__.pop("reverse_path_infos", None)
 425        return obj
 426
 427    def __set__(self, instance: Any, value: Any) -> None:
 428        """
 429        Override Field's __set__ to clear cached related object when FK value changes.
 430
 431        This ensures that when you change obj.user_id, the cached obj.user is invalidated.
 432        """
 433        # Check if value is changing and clear cache if needed
 434        if (
 435            hasattr(self, "attname")
 436            and instance.__dict__.get(self.attname) != value
 437            and self.is_cached(instance)
 438        ):
 439            self.delete_cached_value(instance)
 440
 441        # Call parent's __set__ to do the actual assignment
 442        super().__set__(instance, value)
 443
 444    @cached_property
 445    def related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
 446        return self.resolve_related_fields()
 447
 448    @cached_property
 449    def reverse_related_fields(self) -> list[tuple[Field, Field]]:
 450        return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
 451
 452    @cached_property
 453    def local_related_fields(self) -> tuple[Field, ...]:
 454        return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
 455
 456    @cached_property
 457    def foreign_related_fields(self) -> tuple[Field, ...]:
 458        return tuple(
 459            rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
 460        )
 461
 462    def get_forward_related_filter(self, obj: Model) -> dict[str, Any]:
 463        """
 464        Return the keyword arguments that when supplied to
 465        self.model.object.filter(), would select all instances related through
 466        this field to the remote obj. This is used to build the querysets
 467        returned by related descriptors. obj is an instance of
 468        self.related_field.model.
 469        """
 470        return {
 471            f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
 472            for _, rh_field in self.related_fields
 473        }
 474
 475    def get_reverse_related_filter(self, obj: Model) -> Q:
 476        """
 477        Complement to get_forward_related_filter(). Return the keyword
 478        arguments that when passed to self.related_field.model.object.filter()
 479        select all instances of self.related_field.model related through
 480        this field to obj. obj is an instance of self.model.
 481        """
 482        return Q.create(
 483            [
 484                (rh_field.attname, getattr(obj, lh_field.attname))
 485                for lh_field, rh_field in self.related_fields
 486            ]
 487        )
 488
 489    def get_local_related_value(self, instance: Model) -> tuple[Any, ...]:
 490        # Always returns the value of the single local field
 491        field = self.local_related_fields[0]
 492        if field.primary_key:
 493            return (instance.id,)
 494        return (getattr(instance, field.attname),)
 495
 496    def get_foreign_related_value(self, instance: Model) -> tuple[Any, ...]:
 497        # Always returns the id of the foreign instance
 498        return (instance.id,)
 499
 500    def get_joining_columns(
 501        self, reverse_join: bool = False
 502    ) -> tuple[tuple[str, str], ...]:
 503        # Always returns a single column pair
 504        if reverse_join:
 505            from_field, to_field = self.related_fields[0]
 506            return ((to_field.column, from_field.column),)
 507        else:
 508            from_field, to_field = self.related_fields[0]
 509            return ((from_field.column, to_field.column),)
 510
 511    def get_reverse_joining_columns(self) -> tuple[tuple[str, str], ...]:
 512        return self.get_joining_columns(reverse_join=True)
 513
 514    def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
 515        """Get path from this field to the related model."""
 516        meta = self.remote_field.model._model_meta
 517        from_meta = self.model._model_meta
 518        return [
 519            PathInfo(
 520                from_meta=from_meta,
 521                to_meta=meta,
 522                target_fields=self.foreign_related_fields,
 523                join_field=self,
 524                m2m=False,
 525                direct=True,
 526                filtered_relation=filtered_relation,
 527            )
 528        ]
 529
 530    @cached_property
 531    def path_infos(self) -> list[PathInfo]:
 532        return self.get_path_info()
 533
 534    def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
 535        """Get path from the related model to this field's model."""
 536        meta = self.model._model_meta
 537        from_meta = self.remote_field.model._model_meta
 538        return [
 539            PathInfo(
 540                from_meta=from_meta,
 541                to_meta=meta,
 542                target_fields=(meta.get_forward_field("id"),),
 543                join_field=self.remote_field,
 544                m2m=not self.primary_key,
 545                direct=False,
 546                filtered_relation=filtered_relation,
 547            )
 548        ]
 549
 550    @cached_property
 551    def reverse_path_infos(self) -> list[PathInfo]:
 552        return self.get_reverse_path_info()
 553
 554    def contribute_to_class(self, cls: type[Model], name: str) -> None:
 555        super().contribute_to_class(cls, name)
 556        setattr(cls, name, ForwardForeignKeyDescriptor(self))
 557
 558    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 559        return [
 560            *super().preflight(**kwargs),
 561            *self._check_on_delete(),
 562        ]
 563
 564    def _check_on_delete(self) -> list[PreflightResult]:
 565        on_delete = getattr(self.remote_field, "on_delete", None)
 566        results: list[PreflightResult] = []
 567        if on_delete is SET_NULL and not self.allow_null:
 568            results.append(
 569                PreflightResult(
 570                    fix=(
 571                        "Field specifies on_delete=SET_NULL, but cannot be null. "
 572                        "Set allow_null=True argument on the field, or change the on_delete rule."
 573                    ),
 574                    obj=self,
 575                    id="fields.foreign_key_null_constraint_violation",
 576                )
 577            )
 578        if not self.db_constraint and on_delete is not NO_ACTION:
 579            results.append(
 580                PreflightResult(
 581                    fix=(
 582                        "db_constraint=False requires on_delete=NO_ACTION. "
 583                        "Without a FOREIGN KEY constraint there is no place for "
 584                        "Postgres to apply the delete behavior."
 585                    ),
 586                    obj=self,
 587                    id="fields.foreign_key_unconstrained_requires_no_action",
 588                )
 589            )
 590        return results
 591
 592    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
 593        name, path, args, kwargs = super().deconstruct()
 594        kwargs["on_delete"] = self.remote_field.on_delete
 595
 596        if isinstance(self.remote_field.model, str):
 597            if "." in self.remote_field.model:
 598                package_label, model_name = self.remote_field.model.split(".")
 599                kwargs["to"] = f"{package_label}.{model_name.lower()}"
 600            else:
 601                kwargs["to"] = self.remote_field.model.lower()
 602        else:
 603            kwargs["to"] = self.remote_field.model.model_options.label_lower
 604
 605        if self.db_constraint is not True:
 606            kwargs["db_constraint"] = self.db_constraint
 607
 608        return name, path, args, kwargs
 609
 610    def to_python(self, value: Any) -> Any:
 611        return self.target_field.to_python(value)
 612
 613    @property
 614    def target_field(self) -> Field:
 615        return self.foreign_related_fields[0]
 616
 617    def validate(self, value: Any, model_instance: Model) -> None:
 618        super().validate(value, model_instance)
 619        if value is None:
 620            return None
 621
 622        field_name = self.remote_field.field_name
 623        if field_name is None:
 624            raise ValueError("remote_field.field_name cannot be None")
 625        qs = self.remote_field.model._model_meta.base_queryset.filter(
 626            **{field_name: value}
 627        )
 628        qs = qs.complex_filter(self.get_limit_choices_to())
 629        if not qs.exists():
 630            raise exceptions.ValidationError(
 631                "%(model)s instance with %(field)s %(value)r does not exist.",
 632                code="invalid",
 633                params={
 634                    "model": self.remote_field.model.model_options.model_name,
 635                    "id": value,
 636                    "field": self.remote_field.field_name,
 637                    "value": value,
 638                },
 639            )
 640
 641    def resolve_related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
 642        if isinstance(self.remote_field.model, str):
 643            raise ValueError(
 644                f"Related model {self.remote_field.model!r} cannot be resolved"
 645            )
 646        from_field = self
 647        to_field = self.remote_field.model._model_meta.get_forward_field("id")
 648        related_fields: list[tuple[ForeignKeyField, Field]] = [(from_field, to_field)]
 649
 650        for from_field, to_field in related_fields:
 651            if to_field and to_field.model != self.remote_field.model:
 652                raise FieldError(
 653                    f"'{self.model.model_options.label}.{self.name}' refers to field '{to_field.name}' which is not local to model "
 654                    f"'{self.remote_field.model.model_options.label}'."
 655                )
 656        return related_fields
 657
 658    def get_attname(self) -> str:
 659        return f"{self.name}_id"
 660
 661    def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
 662        if value is None or (
 663            value == "" and not self.target_field.empty_strings_allowed
 664        ):
 665            return None
 666        else:
 667            return self.target_field.get_db_prep_save(value, connection=connection)
 668
 669    def get_db_prep_value(
 670        self, value: Any, connection: DatabaseConnection, prepared: bool = False
 671    ) -> Any:
 672        return self.target_field.get_db_prep_value(value, connection, prepared)
 673
 674    def get_prep_value(self, value: Any) -> Any:
 675        return self.target_field.get_prep_value(value)
 676
 677    def db_type(self) -> str | None:
 678        return self.target_field.rel_db_type()
 679
 680    def cast_db_type(self) -> str | None:
 681        return self.target_field.cast_db_type()
 682
 683    def get_col(self, alias: str | None, output_field: Field | None = None) -> Any:
 684        if output_field is None:
 685            output_field = self.target_field
 686            while isinstance(output_field, ForeignKeyField):
 687                output_field = output_field.target_field
 688                if output_field is self:
 689                    raise ValueError("Cannot resolve output_field.")
 690        return super().get_col(alias, output_field)
 691
 692
 693# Register lookups for ForeignKey
 694ForeignKeyField.register_lookup(RelatedIn)
 695ForeignKeyField.register_lookup(RelatedExact)
 696ForeignKeyField.register_lookup(RelatedLessThan)
 697ForeignKeyField.register_lookup(RelatedGreaterThan)
 698ForeignKeyField.register_lookup(RelatedGreaterThanOrEqual)
 699ForeignKeyField.register_lookup(RelatedLessThanOrEqual)
 700ForeignKeyField.register_lookup(RelatedIsNull)
 701
 702
 703class ManyToManyField(RelatedField):
 704    """
 705    Provide a many-to-many relation by using an intermediary model that
 706    holds two ForeignKeyField fields pointed at the two sides of the relation.
 707
 708    Unless a ``through`` model was provided, ManyToManyField will use the
 709    create_many_to_many_intermediary_model factory to automatically generate
 710    the intermediary model.
 711    """
 712
 713    # ManyToManyField uses ManyToManyRel which has through/through_fields
 714    remote_field: ManyToManyRel
 715
 716    def __init__(
 717        self,
 718        to: type[Model] | str,
 719        *,
 720        through: type[Model] | str,
 721        through_fields: tuple[str, str] | None = None,
 722        related_query_name: str | None = None,
 723        limit_choices_to: Any = None,
 724        symmetrical: bool | None = None,
 725    ):
 726        # M2M has no database column, so `required`, `allow_null`, `default`,
 727        # `validators`, and `choices` are intentionally not accepted. Membership
 728        # is managed through the related manager; filter target rows with
 729        # `limit_choices_to`.
 730        if not isinstance(to, str):
 731            try:
 732                to._model_meta
 733            except AttributeError:
 734                raise TypeError(
 735                    f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ManyToManyField "
 736                    f"must be either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
 737                )
 738
 739        if symmetrical is None:
 740            symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
 741
 742        if not through:
 743            raise ValueError("ManyToManyField must have a 'through' argument.")
 744
 745        self.remote_field = ManyToManyRel(
 746            field=self,
 747            to=to,
 748            related_query_name=related_query_name,
 749            limit_choices_to=limit_choices_to,
 750            symmetrical=symmetrical,
 751            through=through,
 752            through_fields=through_fields,
 753        )
 754
 755        super().__init__()
 756        self._related_query_name = related_query_name
 757        self._limit_choices_to = limit_choices_to
 758
 759    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 760        return [
 761            *super().preflight(**kwargs),
 762            *self._check_relationship_model(**kwargs),
 763            *self._check_table_uniqueness(**kwargs),
 764        ]
 765
 766    def _check_relationship_model(
 767        self, from_model: type[Model] | None = None, **kwargs: Any
 768    ) -> list[PreflightResult]:
 769        if hasattr(self.remote_field.through, "_model_meta"):
 770            qualified_model_name = f"{self.remote_field.through.model_options.package_label}.{self.remote_field.through.__name__}"
 771        else:
 772            qualified_model_name = self.remote_field.through
 773
 774        errors = []
 775
 776        if self.remote_field.through not in self.meta.models_registry.get_models():
 777            # The relationship model is not installed.
 778            errors.append(
 779                PreflightResult(
 780                    fix=(
 781                        "Field specifies a many-to-many relation through model "
 782                        f"'{qualified_model_name}', which has not been installed. "
 783                        "Ensure the through model is properly defined and installed."
 784                    ),
 785                    obj=self,
 786                    id="fields.m2m_through_model_not_installed",
 787                )
 788            )
 789
 790        else:
 791            assert from_model is not None, (
 792                "ManyToManyField with intermediate "
 793                "tables cannot be checked if you don't pass the model "
 794                "where the field is attached to."
 795            )
 796            # Set some useful local variables
 797            to_model = resolve_relation(from_model, self.remote_field.model)
 798            from_model_name = from_model.model_options.object_name
 799            if isinstance(to_model, str):
 800                to_model_name = to_model
 801            else:
 802                to_model_name = to_model.model_options.object_name
 803            relationship_model_name = (
 804                self.remote_field.through.model_options.object_name
 805            )
 806            self_referential = from_model == to_model
 807            # Count foreign keys in intermediate model
 808            if self_referential:
 809                seen_self = sum(
 810                    from_model == field.remote_field.model
 811                    for field in self.remote_field.through._model_meta.fields
 812                    if isinstance(field, RelatedField)
 813                )
 814
 815                if seen_self > 2 and not self.remote_field.through_fields:
 816                    errors.append(
 817                        PreflightResult(
 818                            fix=(
 819                                "The model is used as an intermediate model by "
 820                                f"'{self}', but it has more than two foreign keys "
 821                                f"to '{from_model_name}', which is ambiguous. "
 822                                "Use through_fields to specify which two foreign keys "
 823                                "Plain should use."
 824                            ),
 825                            obj=self.remote_field.through,
 826                            id="fields.m2m_through_model_ambiguous_fks",
 827                        )
 828                    )
 829
 830            else:
 831                # Count foreign keys in relationship model
 832                seen_from = sum(
 833                    from_model == field.remote_field.model
 834                    for field in self.remote_field.through._model_meta.fields
 835                    if isinstance(field, RelatedField)
 836                )
 837                seen_to = sum(
 838                    to_model == field.remote_field.model
 839                    for field in self.remote_field.through._model_meta.fields
 840                    if isinstance(field, RelatedField)
 841                )
 842
 843                if seen_from > 1 and not self.remote_field.through_fields:
 844                    errors.append(
 845                        PreflightResult(
 846                            fix=(
 847                                "The model is used as an intermediate model by "
 848                                f"'{self}', but it has more than one foreign key "
 849                                f"from '{from_model_name}', which is ambiguous. You must specify "
 850                                "which foreign key Plain should use via the "
 851                                "through_fields keyword argument. "
 852                                "If you want to create a recursive relationship, "
 853                                f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
 854                            ),
 855                            obj=self,
 856                            id="fields.m2m_through_model_invalid_recursive_from",
 857                        )
 858                    )
 859
 860                if seen_to > 1 and not self.remote_field.through_fields:
 861                    errors.append(
 862                        PreflightResult(
 863                            fix=(
 864                                "The model is used as an intermediate model by "
 865                                f"'{self}', but it has more than one foreign key "
 866                                f"to '{to_model_name}', which is ambiguous. You must specify "
 867                                "which foreign key Plain should use via the "
 868                                "through_fields keyword argument. "
 869                                "If you want to create a recursive relationship, "
 870                                f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
 871                            ),
 872                            obj=self,
 873                            id="fields.m2m_through_model_invalid_recursive_to",
 874                        )
 875                    )
 876
 877                if seen_from == 0 or seen_to == 0:
 878                    errors.append(
 879                        PreflightResult(
 880                            fix=(
 881                                "The model is used as an intermediate model by "
 882                                f"'{self}', but it does not have a foreign key to '{from_model_name}' or '{to_model_name}'. "
 883                                "Add the required foreign keys to the through model."
 884                            ),
 885                            obj=self.remote_field.through,
 886                            id="fields.m2m_through_model_missing_fk",
 887                        )
 888                    )
 889
 890        # Validate `through_fields`.
 891        if self.remote_field.through_fields is not None:
 892            # Validate that we're given an iterable of at least two items
 893            # and that none of them is "falsy".
 894            if not (
 895                len(self.remote_field.through_fields) >= 2
 896                and self.remote_field.through_fields[0]
 897                and self.remote_field.through_fields[1]
 898            ):
 899                errors.append(
 900                    PreflightResult(
 901                        fix=(
 902                            "Field specifies 'through_fields' but does not provide "
 903                            "the names of the two link fields that should be used "
 904                            f"for the relation through model '{qualified_model_name}'. "
 905                            "Make sure you specify 'through_fields' as "
 906                            "through_fields=('field1', 'field2')."
 907                        ),
 908                        obj=self,
 909                        id="fields.m2m_through_fields_wrong_length",
 910                    )
 911                )
 912
 913            # Validate the given through fields -- they should be actual
 914            # fields on the through model, and also be foreign keys to the
 915            # expected models.
 916            else:
 917                assert from_model is not None, (
 918                    "ManyToManyField with intermediate "
 919                    "tables cannot be checked if you don't pass the model "
 920                    "where the field is attached to."
 921                )
 922
 923                source, through, target = (
 924                    from_model,
 925                    self.remote_field.through,
 926                    self.remote_field.model,
 927                )
 928                source_field_name, target_field_name = self.remote_field.through_fields[
 929                    :2
 930                ]
 931
 932                for field_name, related_model in (
 933                    (source_field_name, source),
 934                    (target_field_name, target),
 935                ):
 936                    possible_field_names = []
 937                    for f in through._model_meta.fields:
 938                        if (
 939                            hasattr(f, "remote_field")
 940                            and getattr(f.remote_field, "model", None) == related_model
 941                        ):
 942                            possible_field_names.append(f.name)
 943                    if possible_field_names:
 944                        fix = (
 945                            "Did you mean one of the following foreign keys to '{}': "
 946                            "{}?".format(
 947                                related_model.model_options.object_name,
 948                                ", ".join(possible_field_names),
 949                            )
 950                        )
 951                    else:
 952                        fix = ""
 953
 954                    try:
 955                        field = through._model_meta.get_forward_field(field_name)
 956                    except FieldDoesNotExist:
 957                        errors.append(
 958                            PreflightResult(
 959                                fix=f"The intermediary model '{qualified_model_name}' has no field '{field_name}'. {fix}",
 960                                obj=self,
 961                                id="fields.m2m_through_field_not_found",
 962                            )
 963                        )
 964                    else:
 965                        if not (
 966                            isinstance(field, RelatedField)
 967                            and field.remote_field.model == related_model
 968                        ):
 969                            errors.append(
 970                                PreflightResult(
 971                                    fix=f"'{through.model_options.object_name}.{field_name}' is not a foreign key to '{related_model.model_options.object_name}'. {fix}",
 972                                    obj=self,
 973                                    id="fields.m2m_through_field_not_fk_to_model",
 974                                )
 975                            )
 976
 977        return errors
 978
 979    def _check_table_uniqueness(self, **kwargs: Any) -> list[PreflightResult]:
 980        if isinstance(self.remote_field.through, str):
 981            return []
 982        registered_tables = {
 983            model.model_options.db_table: model
 984            for model in self.meta.models_registry.get_models()
 985            if model != self.remote_field.through
 986        }
 987        m2m_db_table = self.m2m_db_table()
 988        model = registered_tables.get(m2m_db_table)
 989        # Check if there's already a m2m field using the same through model.
 990        if model and model != self.remote_field.through:
 991            clashing_obj = model.model_options.label
 992            return [
 993                PreflightResult(
 994                    fix=(
 995                        f"The field's intermediary table '{m2m_db_table}' clashes with the "
 996                        f"table name of '{clashing_obj}'. "
 997                        "Change the through model's db_table or use a different model."
 998                    ),
 999                    obj=self,
1000                    id="fields.m2m_table_name_clash",
1001                )
1002            ]
1003        return []
1004
1005    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1006        name, path, args, kwargs = super().deconstruct()
1007
1008        if self.remote_field.db_constraint is not True:
1009            kwargs["db_constraint"] = self.remote_field.db_constraint
1010
1011        # Lowercase model names as they should be treated as case-insensitive.
1012        if isinstance(self.remote_field.model, str):
1013            if "." in self.remote_field.model:
1014                package_label, model_name = self.remote_field.model.split(".")
1015                kwargs["to"] = f"{package_label}.{model_name.lower()}"
1016            else:
1017                kwargs["to"] = self.remote_field.model.lower()
1018        else:
1019            kwargs["to"] = self.remote_field.model.model_options.label_lower
1020
1021        if isinstance(self.remote_field.through, str):
1022            kwargs["through"] = self.remote_field.through
1023        else:
1024            kwargs["through"] = self.remote_field.through.model_options.label
1025
1026        return name, path, args, kwargs
1027
1028    def _get_path_info(
1029        self, direct: bool = False, filtered_relation: Any = None
1030    ) -> list[PathInfo]:
1031        """Called by both direct and indirect m2m traversal."""
1032        int_model = self.remote_field.through
1033        # M2M through model fields are always ForeignKey
1034        linkfield1 = cast(
1035            ForeignKeyField,
1036            int_model._model_meta.get_forward_field(self.m2m_field_name()),
1037        )
1038        linkfield2 = cast(
1039            ForeignKeyField,
1040            int_model._model_meta.get_forward_field(self.m2m_reverse_field_name()),
1041        )
1042        if direct:
1043            join1infos = linkfield1.reverse_path_infos
1044            if filtered_relation:
1045                join2infos = linkfield2.get_path_info(filtered_relation)
1046            else:
1047                join2infos = linkfield2.path_infos
1048        else:
1049            join1infos = linkfield2.reverse_path_infos
1050            if filtered_relation:
1051                join2infos = linkfield1.get_path_info(filtered_relation)
1052            else:
1053                join2infos = linkfield1.path_infos
1054
1055        return [*join1infos, *join2infos]
1056
1057    def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1058        return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1059
1060    @cached_property
1061    def path_infos(self) -> list[PathInfo]:
1062        return self.get_path_info()
1063
1064    def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1065        return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1066
1067    @cached_property
1068    def reverse_path_infos(self) -> list[PathInfo]:
1069        return self.get_reverse_path_info()
1070
1071    def _get_m2m_db_table(self) -> str:
1072        """
1073        Function that can be curried to provide the m2m table name for this
1074        relation.
1075        """
1076        return self.remote_field.through.model_options.db_table
1077
1078    def _get_m2m_attr(self, related: Any, attr: str) -> Any:
1079        """
1080        Function that can be curried to provide the source accessor or DB
1081        column name for the m2m table.
1082        """
1083        cache_attr = f"_m2m_{attr}_cache"
1084        if hasattr(self, cache_attr):
1085            return getattr(self, cache_attr)
1086        if self.remote_field.through_fields is not None:
1087            link_field_name: str | None = self.remote_field.through_fields[0]
1088        else:
1089            link_field_name = None
1090        for f in self.remote_field.through._model_meta.fields:
1091            if (
1092                isinstance(f, RelatedField)
1093                and f.remote_field.model == related.related_model
1094                and (link_field_name is None or link_field_name == f.name)
1095            ):
1096                setattr(self, cache_attr, getattr(f, attr))
1097                return getattr(self, cache_attr)
1098        return None
1099
1100    def _get_m2m_reverse_attr(self, related: Any, attr: str) -> Any:
1101        """
1102        Function that can be curried to provide the related accessor or DB
1103        column name for the m2m table.
1104        """
1105        cache_attr = f"_m2m_reverse_{attr}_cache"
1106        if hasattr(self, cache_attr):
1107            return getattr(self, cache_attr)
1108        found = False
1109        if self.remote_field.through_fields is not None:
1110            link_field_name: str | None = self.remote_field.through_fields[1]
1111        else:
1112            link_field_name = None
1113        for f in self.remote_field.through._model_meta.fields:
1114            if isinstance(f, RelatedField) and f.remote_field.model == related.model:
1115                if link_field_name is None and related.related_model == related.model:
1116                    # If this is an m2m-intermediate to self,
1117                    # the first foreign key you find will be
1118                    # the source column. Keep searching for
1119                    # the second foreign key.
1120                    if found:
1121                        setattr(self, cache_attr, getattr(f, attr))
1122                        break
1123                    else:
1124                        found = True
1125                elif link_field_name is None or link_field_name == f.name:
1126                    setattr(self, cache_attr, getattr(f, attr))
1127                    break
1128        return getattr(self, cache_attr)
1129
1130    def contribute_to_class(self, cls: type[Model], name: str) -> None:
1131        super().contribute_to_class(cls, name)
1132
1133        def resolve_through_model(
1134            _: Any, model: type[Model], field: ManyToManyField
1135        ) -> None:
1136            field.remote_field.through = model
1137
1138        lazy_related_operation(
1139            resolve_through_model,
1140            cls,
1141            self.remote_field.through,
1142            field=self,
1143        )
1144
1145        # Add the descriptor for the m2m relation.
1146        setattr(cls, self.name, ForwardManyToManyDescriptor(self.remote_field))  # ty: ignore[invalid-argument-type]
1147
1148        # Set up the accessor for the m2m table name for the relation.
1149        self.m2m_db_table = self._get_m2m_db_table
1150
1151    def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
1152        """Set up M2M metadata accessors for the through table."""
1153        super().do_related_class(other, cls)
1154
1155        # Set up the accessors for the column names on the m2m table.
1156        # These are used during query construction and schema operations.
1157        related = self.remote_field
1158        self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1159        self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1160
1161        self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1162        self.m2m_reverse_field_name = partial(
1163            self._get_m2m_reverse_attr, related, "name"
1164        )
1165
1166        get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1167        self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1168        get_m2m_reverse_rel = partial(
1169            self._get_m2m_reverse_attr, related, "remote_field"
1170        )
1171        self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1172
1173    def set_attributes_from_rel(self) -> None:
1174        pass
1175
1176    def value_from_object(self, obj: Model) -> list[Any]:
1177        return [] if obj.id is None else list(getattr(obj, self.attname).query)
1178
1179    def save_form_data(self, instance: Model, data: Any) -> None:
1180        getattr(instance, self.attname).set(data)
1181
1182    def db_type(self) -> None:
1183        # A ManyToManyField is not represented by a single column,
1184        # so return None.
1185        return None