1from __future__ import annotations
   2
   3import copy
   4from functools import cached_property, partial
   5from typing import TYPE_CHECKING, Any, Self, cast
   6
   7from plain import exceptions
   8from plain.postgres.constants import LOOKUP_SEP
   9from plain.postgres.deletion import SET_DEFAULT, SET_NULL
  10from plain.postgres.exceptions import FieldDoesNotExist, FieldError
  11from plain.postgres.query_utils import PathInfo, Q
  12from plain.postgres.utils import make_model_tuple
  13from plain.preflight import PreflightResult
  14from plain.runtime import SettingsReference
  15
  16from ..registry import models_registry
  17from . import DbParameters, Field
  18from .mixins import FieldCacheMixin
  19from .related_descriptors import (
  20    ForwardForeignKeyDescriptor,
  21    ForwardManyToManyDescriptor,
  22)
  23from .related_lookups import (
  24    RelatedExact,
  25    RelatedGreaterThan,
  26    RelatedGreaterThanOrEqual,
  27    RelatedIn,
  28    RelatedIsNull,
  29    RelatedLessThan,
  30    RelatedLessThanOrEqual,
  31)
  32from .reverse_related import ForeignKeyRel, ManyToManyRel
  33
  34if TYPE_CHECKING:
  35    from plain.postgres.base import Model
  36    from plain.postgres.connection import DatabaseConnection
  37    from plain.postgres.fields.reverse_related import ForeignObjectRel
  38
  39RECURSIVE_RELATIONSHIP_CONSTANT = "self"
  40
  41
  42def resolve_relation(
  43    scope_model: type[Model], relation: type[Model] | str
  44) -> type[Model] | str:
  45    """
  46    Transform relation into a model or fully-qualified model string of the form
  47    "package_label.ModelName", relative to scope_model.
  48
  49    The relation argument can be:
  50      * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
  51        the model argument will be returned.
  52      * A bare model name without an package_label, in which case scope_model's
  53        package_label will be prepended.
  54      * An "package_label.ModelName" string.
  55      * A model class, which will be returned unchanged.
  56    """
  57    # Check for recursive relations
  58    if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
  59        relation = scope_model
  60
  61    # Look for an "app.Model" relation
  62    if isinstance(relation, str):
  63        if "." not in relation:
  64            relation = f"{scope_model.model_options.package_label}.{relation}"
  65
  66    return relation
  67
  68
  69def lazy_related_operation(
  70    function: Any, model: type[Model], *related_models: type[Model] | str, **kwargs: Any
  71) -> None:
  72    """
  73    Schedule `function` to be called once `model` and all `related_models`
  74    have been imported and registered with the app registry. `function` will
  75    be called with the newly-loaded model classes as its positional arguments,
  76    plus any optional keyword arguments.
  77
  78    The `model` argument must be a model class. Each subsequent positional
  79    argument is another model, or a reference to another model - see
  80    `resolve_relation()` for the various forms these may take. Any relative
  81    references will be resolved relative to `model`.
  82
  83    This is a convenience wrapper for `Packages.lazy_model_operation` - the app
  84    registry model used is the one found in `model._model_meta.models_registry`.
  85    """
  86    models = [model] + [resolve_relation(model, rel) for rel in related_models]
  87    model_keys = (make_model_tuple(m) for m in models)
  88    models_registry = model._model_meta.models_registry
  89    return models_registry.lazy_model_operation(
  90        partial(function, **kwargs), *model_keys
  91    )
  92
  93
  94class RelatedField(FieldCacheMixin, Field):
  95    """Base class that all relational fields inherit from."""
  96
  97    # RelatedField always has a remote_field (never None)
  98    remote_field: ForeignObjectRel
  99    # path_infos is implemented as @cached_property in subclasses (ForeignKey, ManyToManyField)
 100    path_infos: list[PathInfo]
 101
 102    def __init__(
 103        self,
 104        *,
 105        related_query_name: str | None = None,
 106        limit_choices_to: Any = None,
 107        **kwargs: Any,
 108    ):
 109        self._related_query_name = related_query_name
 110        self._limit_choices_to = limit_choices_to
 111        super().__init__(**kwargs)
 112
 113    def __deepcopy__(self, memodict: dict[int, Any]) -> Self:
 114        # Handle remote_field deepcopy for RelatedFields
 115        obj = super().__deepcopy__(memodict)
 116        obj.remote_field = copy.copy(self.remote_field)
 117        if hasattr(self.remote_field, "field") and self.remote_field.field is self:
 118            obj.remote_field.field = obj  # type: ignore[misc]
 119        return obj
 120
 121    @cached_property
 122    def related_model(self) -> type[Model]:
 123        # Can't cache this property until all the models are loaded.
 124        models_registry.check_ready()
 125        return self.remote_field.model
 126
 127    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 128        return [
 129            *super().preflight(**kwargs),
 130            *self._check_related_query_name_is_valid(),
 131            *self._check_relation_model_exists(),
 132            *self._check_clashes(),
 133        ]
 134
 135    def _check_related_query_name_is_valid(self) -> list[PreflightResult]:
 136        # Always validate related_query_name since it's still used for ORM queries
 137        # (e.g., User.query.filter(articles__title="..."))
 138        rel_query_name = self.related_query_name()
 139        errors: list[PreflightResult] = []
 140        if rel_query_name.endswith("_"):
 141            errors.append(
 142                PreflightResult(
 143                    fix=(
 144                        f"Reverse query name '{rel_query_name}' must not end with an underscore. "
 145                        "Use a different related_query_name."
 146                    ),
 147                    obj=self,
 148                    id="fields.related_field_accessor_clash",
 149                )
 150            )
 151        if LOOKUP_SEP in rel_query_name:
 152            errors.append(
 153                PreflightResult(
 154                    fix=(
 155                        f"Reverse query name '{rel_query_name}' must not contain '{LOOKUP_SEP}'. "
 156                        "Use a different related_query_name."
 157                    ),
 158                    obj=self,
 159                    id="fields.related_field_query_name_clash",
 160                )
 161            )
 162        return errors
 163
 164    def _check_relation_model_exists(self) -> list[PreflightResult]:
 165        rel_is_missing = (
 166            self.remote_field.model not in self.meta.models_registry.get_models()
 167        )
 168        rel_is_string = isinstance(self.remote_field.model, str)
 169        model_name = (
 170            self.remote_field.model
 171            if rel_is_string
 172            else self.remote_field.model.model_options.object_name
 173        )
 174        if rel_is_missing and rel_is_string:
 175            return [
 176                PreflightResult(
 177                    fix=(
 178                        f"Field defines a relation with model '{model_name}', which is either "
 179                        "not installed, or is abstract. Ensure the model is installed and not abstract."
 180                    ),
 181                    obj=self,
 182                    id="fields.related_model_not_installed",
 183                )
 184            ]
 185        return []
 186
 187    def _check_clashes(self) -> list[PreflightResult]:
 188        """Check accessor and reverse query name clashes."""
 189        from plain.postgres.base import ModelBase
 190
 191        errors: list[PreflightResult] = []
 192
 193        # f.remote_field.model may be a string instead of a model. Skip if
 194        # model name is not resolved.
 195        if not isinstance(self.remote_field.model, ModelBase):
 196            return []
 197
 198        # Consider that we are checking field `Model.foreign` and the models
 199        # are:
 200        #
 201        #     class Target(models.Model):
 202        #         model = models.IntegerField()
 203        #         model_set = models.IntegerField()
 204        #
 205        #     class Model(models.Model):
 206        #         foreign = models.ForeignKeyField(Target)
 207        #         m2m = models.ManyToManyField(Target)
 208
 209        # rel_options.object_name == "Target"
 210        rel_meta = self.remote_field.model._model_meta
 211        rel_options = self.remote_field.model.model_options
 212        rel_query_name = self.related_query_name()  # i. e. "model"
 213        # i.e. "package_label.Model.field".
 214        field_name = f"{self.model.model_options.label}.{self.name}"
 215
 216        # Check clashes between reverse query name of `field`
 217        # and any other field name.
 218        potential_clashes = rel_meta.fields + rel_meta.many_to_many
 219        for clash_field in potential_clashes:
 220            # i.e. "package_label.Target.model_set".
 221            clash_name = f"{rel_options.label}.{clash_field.name}"
 222            if clash_field.name == rel_query_name:
 223                errors.append(
 224                    PreflightResult(
 225                        fix=(
 226                            f"Reverse query name for '{field_name}' clashes with field name '{clash_name}'. "
 227                            f"Rename field '{clash_name}' or use a different related_query_name."
 228                        ),
 229                        obj=self,
 230                        id="fields.related_accessor_clash_manager",
 231                    )
 232                )
 233
 234        return errors
 235
 236    def db_type(self) -> str | None:
 237        # By default related field will not have a column as it relates to
 238        # columns from another table.
 239        return None
 240
 241    def contribute_to_class(self, cls: type[Model], name: str) -> None:
 242        super().contribute_to_class(cls, name)
 243
 244        self.meta = cls._model_meta
 245
 246        if self.remote_field.related_query_name:
 247            related_query_name = self.remote_field.related_query_name % {
 248                "class": cls.__name__.lower(),
 249                "package_label": cls.model_options.package_label.lower(),
 250            }
 251            self.remote_field.related_query_name = related_query_name
 252
 253        def resolve_related_class(
 254            model: type[Model], related: type[Model], field: RelatedField
 255        ) -> None:
 256            field.remote_field.model = related
 257            field.do_related_class(related, model)
 258
 259        lazy_related_operation(
 260            resolve_related_class,
 261            cls,
 262            self.remote_field.model,
 263            field=self,
 264        )
 265
 266    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
 267        name, path, args, kwargs = super().deconstruct()
 268        if self._limit_choices_to:
 269            kwargs["limit_choices_to"] = self._limit_choices_to
 270        if self._related_query_name is not None:
 271            kwargs["related_query_name"] = self._related_query_name
 272        return name, path, args, kwargs
 273
 274    def set_attributes_from_rel(self) -> None:
 275        self.name = self.name or (
 276            self.remote_field.model.model_options.model_name + "_" + "id"
 277        )
 278        self.remote_field.set_field_name()
 279
 280    def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
 281        self.set_attributes_from_rel()
 282
 283    def get_limit_choices_to(self) -> Any:
 284        """
 285        Return ``limit_choices_to`` for this model field.
 286
 287        If it is a callable, it will be invoked and the result will be
 288        returned.
 289        """
 290        if callable(self.remote_field.limit_choices_to):
 291            return self.remote_field.limit_choices_to()  # type: ignore[call-top-callable]
 292        return self.remote_field.limit_choices_to
 293
 294    def related_query_name(self) -> str:
 295        """
 296        Define the name that can be used to identify this related object in a
 297        table-spanning query.
 298        """
 299        return (
 300            self.remote_field.related_query_name or self.model.model_options.model_name
 301        )
 302
 303    @property
 304    def target_field(self) -> Field:
 305        """
 306        When filtering against this relation, return the field on the remote
 307        model against which the filtering should happen.
 308        """
 309        target_fields = self.path_infos[-1].target_fields
 310        if len(target_fields) > 1:
 311            raise FieldError(
 312                "The relation has multiple target fields, but only single target field "
 313                "was asked for"
 314            )
 315        return target_fields[0]
 316
 317    def get_cache_name(self) -> str:
 318        assert self.name is not None, "Field name must be set"
 319        return self.name
 320
 321
 322class ForeignKeyField(RelatedField):
 323    """
 324    Provide a many-to-one relation by adding a column to the local model
 325    to hold the remote value.
 326
 327    ForeignKeyField targets the primary key (id) of the remote model.
 328    """
 329
 330    empty_strings_allowed = False
 331    default_error_messages = {
 332        "invalid": "%(model)s instance with %(field)s %(value)r does not exist."
 333    }
 334    description = "Foreign Key (type determined by related field)"
 335
 336    def __init__(
 337        self,
 338        to: type[Model] | str,
 339        on_delete: Any,
 340        related_query_name: str | None = None,
 341        limit_choices_to: Any = None,
 342        db_index: bool = True,
 343        db_constraint: bool = True,
 344        **kwargs: Any,
 345    ):
 346        if not isinstance(to, str):
 347            try:
 348                to.model_options.model_name
 349            except AttributeError:
 350                raise TypeError(
 351                    f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ForeignKeyField must be "
 352                    f"either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
 353                )
 354        if not callable(on_delete):
 355            raise TypeError("on_delete must be callable.")
 356
 357        self.remote_field = ForeignKeyRel(
 358            self,
 359            to,
 360            related_query_name=related_query_name,
 361            limit_choices_to=limit_choices_to,
 362            on_delete=on_delete,
 363        )
 364
 365        super().__init__(
 366            related_query_name=related_query_name,
 367            limit_choices_to=limit_choices_to,
 368            **kwargs,
 369        )
 370        self.db_index = db_index
 371        self.db_constraint = db_constraint
 372
 373    def __copy__(self) -> ForeignKeyField:
 374        obj = super().__copy__()
 375        # Remove any cached PathInfo values.
 376        obj.__dict__.pop("path_infos", None)
 377        obj.__dict__.pop("reverse_path_infos", None)
 378        return obj
 379
 380    def __set__(self, instance: Any, value: Any) -> None:
 381        """
 382        Override Field's __set__ to clear cached related object when FK value changes.
 383
 384        This ensures that when you change obj.user_id, the cached obj.user is invalidated.
 385        """
 386        # Check if value is changing and clear cache if needed
 387        if (
 388            hasattr(self, "attname")
 389            and instance.__dict__.get(self.attname) != value
 390            and self.is_cached(instance)
 391        ):
 392            self.delete_cached_value(instance)
 393
 394        # Call parent's __set__ to do the actual assignment
 395        super().__set__(instance, value)
 396
 397    @cached_property
 398    def related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
 399        return self.resolve_related_fields()
 400
 401    @cached_property
 402    def reverse_related_fields(self) -> list[tuple[Field, Field]]:
 403        return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
 404
 405    @cached_property
 406    def local_related_fields(self) -> tuple[Field, ...]:
 407        return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
 408
 409    @cached_property
 410    def foreign_related_fields(self) -> tuple[Field, ...]:
 411        return tuple(
 412            rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
 413        )
 414
 415    def get_forward_related_filter(self, obj: Model) -> dict[str, Any]:
 416        """
 417        Return the keyword arguments that when supplied to
 418        self.model.object.filter(), would select all instances related through
 419        this field to the remote obj. This is used to build the querysets
 420        returned by related descriptors. obj is an instance of
 421        self.related_field.model.
 422        """
 423        return {
 424            f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
 425            for _, rh_field in self.related_fields
 426        }
 427
 428    def get_reverse_related_filter(self, obj: Model) -> Q:
 429        """
 430        Complement to get_forward_related_filter(). Return the keyword
 431        arguments that when passed to self.related_field.model.object.filter()
 432        select all instances of self.related_field.model related through
 433        this field to obj. obj is an instance of self.model.
 434        """
 435        return Q.create(
 436            [
 437                (rh_field.attname, getattr(obj, lh_field.attname))
 438                for lh_field, rh_field in self.related_fields
 439            ]
 440        )
 441
 442    def get_local_related_value(self, instance: Model) -> tuple[Any, ...]:
 443        # Always returns the value of the single local field
 444        field = self.local_related_fields[0]
 445        if field.primary_key:
 446            return (instance.id,)
 447        return (getattr(instance, field.attname),)
 448
 449    def get_foreign_related_value(self, instance: Model) -> tuple[Any, ...]:
 450        # Always returns the id of the foreign instance
 451        return (instance.id,)
 452
 453    def get_joining_columns(
 454        self, reverse_join: bool = False
 455    ) -> tuple[tuple[str, str], ...]:
 456        # Always returns a single column pair
 457        if reverse_join:
 458            from_field, to_field = self.related_fields[0]
 459            return ((to_field.column, from_field.column),)
 460        else:
 461            from_field, to_field = self.related_fields[0]
 462            return ((from_field.column, to_field.column),)
 463
 464    def get_reverse_joining_columns(self) -> tuple[tuple[str, str], ...]:
 465        return self.get_joining_columns(reverse_join=True)
 466
 467    def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
 468        """Get path from this field to the related model."""
 469        meta = self.remote_field.model._model_meta
 470        from_meta = self.model._model_meta
 471        return [
 472            PathInfo(
 473                from_meta=from_meta,
 474                to_meta=meta,
 475                target_fields=self.foreign_related_fields,
 476                join_field=self,
 477                m2m=False,
 478                direct=True,
 479                filtered_relation=filtered_relation,
 480            )
 481        ]
 482
 483    @cached_property
 484    def path_infos(self) -> list[PathInfo]:
 485        return self.get_path_info()
 486
 487    def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
 488        """Get path from the related model to this field's model."""
 489        meta = self.model._model_meta
 490        from_meta = self.remote_field.model._model_meta
 491        return [
 492            PathInfo(
 493                from_meta=from_meta,
 494                to_meta=meta,
 495                target_fields=(meta.get_forward_field("id"),),
 496                join_field=self.remote_field,
 497                m2m=not self.primary_key,
 498                direct=False,
 499                filtered_relation=filtered_relation,
 500            )
 501        ]
 502
 503    @cached_property
 504    def reverse_path_infos(self) -> list[PathInfo]:
 505        return self.get_reverse_path_info()
 506
 507    def contribute_to_class(self, cls: type[Model], name: str) -> None:
 508        super().contribute_to_class(cls, name)
 509        setattr(cls, name, ForwardForeignKeyDescriptor(self))
 510
 511    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 512        return [
 513            *super().preflight(**kwargs),
 514            *self._check_on_delete(),
 515        ]
 516
 517    def _check_on_delete(self) -> list[PreflightResult]:
 518        on_delete = getattr(self.remote_field, "on_delete", None)
 519        if on_delete == SET_NULL and not self.allow_null:
 520            return [
 521                PreflightResult(
 522                    fix=(
 523                        "Field specifies on_delete=SET_NULL, but cannot be null. "
 524                        "Set allow_null=True argument on the field, or change the on_delete rule."
 525                    ),
 526                    obj=self,
 527                    id="fields.foreign_key_null_constraint_violation",
 528                )
 529            ]
 530        elif on_delete == SET_DEFAULT and not self.has_default():
 531            return [
 532                PreflightResult(
 533                    fix=(
 534                        "Field specifies on_delete=SET_DEFAULT, but has no default value. "
 535                        "Set a default value, or change the on_delete rule."
 536                    ),
 537                    obj=self,
 538                    id="fields.foreign_key_set_default_no_default",
 539                )
 540            ]
 541        else:
 542            return []
 543
 544    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
 545        name, path, args, kwargs = super().deconstruct()
 546        kwargs["on_delete"] = self.remote_field.on_delete
 547
 548        if isinstance(self.remote_field.model, SettingsReference):
 549            kwargs["to"] = self.remote_field.model
 550        elif isinstance(self.remote_field.model, str):
 551            if "." in self.remote_field.model:
 552                package_label, model_name = self.remote_field.model.split(".")
 553                kwargs["to"] = f"{package_label}.{model_name.lower()}"
 554            else:
 555                kwargs["to"] = self.remote_field.model.lower()
 556        else:
 557            kwargs["to"] = self.remote_field.model.model_options.label_lower
 558
 559        if self.db_index is not True:
 560            kwargs["db_index"] = self.db_index
 561
 562        if self.db_constraint is not True:
 563            kwargs["db_constraint"] = self.db_constraint
 564
 565        return name, path, args, kwargs
 566
 567    def to_python(self, value: Any) -> Any:
 568        return self.target_field.to_python(value)
 569
 570    @property
 571    def target_field(self) -> Field:
 572        return self.foreign_related_fields[0]
 573
 574    def validate(self, value: Any, model_instance: Model) -> None:
 575        super().validate(value, model_instance)
 576        if value is None:
 577            return None
 578
 579        field_name = self.remote_field.field_name
 580        if field_name is None:
 581            raise ValueError("remote_field.field_name cannot be None")
 582        qs = self.remote_field.model._model_meta.base_queryset.filter(
 583            **{field_name: value}
 584        )
 585        qs = qs.complex_filter(self.get_limit_choices_to())
 586        if not qs.exists():
 587            raise exceptions.ValidationError(
 588                self.error_messages["invalid"],
 589                code="invalid",
 590                params={
 591                    "model": self.remote_field.model.model_options.model_name,
 592                    "id": value,
 593                    "field": self.remote_field.field_name,
 594                    "value": value,
 595                },
 596            )
 597
 598    def resolve_related_fields(self) -> list[tuple[ForeignKeyField, Field]]:
 599        if isinstance(self.remote_field.model, str):
 600            raise ValueError(
 601                f"Related model {self.remote_field.model!r} cannot be resolved"
 602            )
 603        from_field = self
 604        to_field = self.remote_field.model._model_meta.get_forward_field("id")
 605        related_fields: list[tuple[ForeignKeyField, Field]] = [(from_field, to_field)]
 606
 607        for from_field, to_field in related_fields:
 608            if to_field and to_field.model != self.remote_field.model:
 609                raise FieldError(
 610                    f"'{self.model.model_options.label}.{self.name}' refers to field '{to_field.name}' which is not local to model "
 611                    f"'{self.remote_field.model.model_options.label}'."
 612                )
 613        return related_fields
 614
 615    def get_attname(self) -> str:
 616        return f"{self.name}_id"
 617
 618    def get_default(self) -> Any:
 619        """Return the to_field if the default value is an object."""
 620        field_default = super().get_default()
 621        if isinstance(field_default, self.remote_field.model):
 622            return getattr(field_default, self.target_field.attname)
 623        return field_default
 624
 625    def get_db_prep_save(self, value: Any, connection: DatabaseConnection) -> Any:
 626        if value is None or (
 627            value == "" and not self.target_field.empty_strings_allowed
 628        ):
 629            return None
 630        else:
 631            return self.target_field.get_db_prep_save(value, connection=connection)
 632
 633    def get_db_prep_value(
 634        self, value: Any, connection: DatabaseConnection, prepared: bool = False
 635    ) -> Any:
 636        return self.target_field.get_db_prep_value(value, connection, prepared)
 637
 638    def get_prep_value(self, value: Any) -> Any:
 639        return self.target_field.get_prep_value(value)
 640
 641    def db_check(self) -> None:
 642        return None
 643
 644    def db_type(self) -> str | None:
 645        return self.target_field.rel_db_type()
 646
 647    def cast_db_type(self) -> str | None:
 648        return self.target_field.cast_db_type()
 649
 650    def db_parameters(self) -> DbParameters:
 651        return {
 652            "type": self.db_type(),
 653            "check": self.db_check(),
 654        }
 655
 656    def get_col(self, alias: str | None, output_field: Field | None = None) -> Any:
 657        if output_field is None:
 658            output_field = self.target_field
 659            while isinstance(output_field, ForeignKeyField):
 660                output_field = output_field.target_field
 661                if output_field is self:
 662                    raise ValueError("Cannot resolve output_field.")
 663        return super().get_col(alias, output_field)
 664
 665
 666# Register lookups for ForeignKey
 667ForeignKeyField.register_lookup(RelatedIn)
 668ForeignKeyField.register_lookup(RelatedExact)
 669ForeignKeyField.register_lookup(RelatedLessThan)
 670ForeignKeyField.register_lookup(RelatedGreaterThan)
 671ForeignKeyField.register_lookup(RelatedGreaterThanOrEqual)
 672ForeignKeyField.register_lookup(RelatedLessThanOrEqual)
 673ForeignKeyField.register_lookup(RelatedIsNull)
 674
 675
 676class ManyToManyField(RelatedField):
 677    """
 678    Provide a many-to-many relation by using an intermediary model that
 679    holds two ForeignKeyField fields pointed at the two sides of the relation.
 680
 681    Unless a ``through`` model was provided, ManyToManyField will use the
 682    create_many_to_many_intermediary_model factory to automatically generate
 683    the intermediary model.
 684    """
 685
 686    # ManyToManyField uses ManyToManyRel which has through/through_fields
 687    remote_field: ManyToManyRel
 688
 689    description = "Many-to-many relationship"
 690
 691    def __init__(
 692        self,
 693        to: type[Model] | str,
 694        *,
 695        through: type[Model] | str,
 696        through_fields: tuple[str, str] | None = None,
 697        related_query_name: str | None = None,
 698        limit_choices_to: Any = None,
 699        symmetrical: bool | None = None,
 700        **kwargs: Any,
 701    ):
 702        if not isinstance(to, str):
 703            try:
 704                to._model_meta
 705            except AttributeError:
 706                raise TypeError(
 707                    f"{self.__class__.__name__}({to!r}) is invalid. First parameter to ManyToManyField "
 708                    f"must be either a model, a model name, or the string {RECURSIVE_RELATIONSHIP_CONSTANT!r}"
 709                )
 710
 711        if symmetrical is None:
 712            symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
 713
 714        if not through:
 715            raise ValueError("ManyToManyField must have a 'through' argument.")
 716
 717        self.remote_field = ManyToManyRel(
 718            self,
 719            to,
 720            related_query_name=related_query_name,
 721            limit_choices_to=limit_choices_to,
 722            symmetrical=symmetrical,
 723            through=through,
 724            through_fields=through_fields,
 725        )
 726        self.has_null_arg = "allow_null" in kwargs
 727
 728        super().__init__(
 729            related_query_name=related_query_name,
 730            limit_choices_to=limit_choices_to,
 731            **kwargs,
 732        )
 733
 734    def preflight(self, **kwargs: Any) -> list[PreflightResult]:
 735        return [
 736            *super().preflight(**kwargs),
 737            *self._check_relationship_model(**kwargs),
 738            *self._check_ignored_options(**kwargs),
 739            *self._check_table_uniqueness(**kwargs),
 740        ]
 741
 742    def _check_ignored_options(self, **kwargs: Any) -> list[PreflightResult]:
 743        warnings: list[PreflightResult] = []
 744
 745        if self.has_null_arg:
 746            warnings.append(
 747                PreflightResult(
 748                    fix="The 'null' option has no effect on ManyToManyField. Remove the 'null' argument.",
 749                    obj=self,
 750                    id="fields.m2m_null_has_no_effect",
 751                    warning=True,
 752                )
 753            )
 754
 755        if self._validators:
 756            warnings.append(
 757                PreflightResult(
 758                    fix="ManyToManyField does not support validators. Remove validators from this field.",
 759                    obj=self,
 760                    id="fields.m2m_validators_not_supported",
 761                    warning=True,
 762                )
 763            )
 764
 765        return warnings
 766
 767    def _check_relationship_model(
 768        self, from_model: type[Model] | None = None, **kwargs: Any
 769    ) -> list[PreflightResult]:
 770        if hasattr(self.remote_field.through, "_model_meta"):
 771            qualified_model_name = f"{self.remote_field.through.model_options.package_label}.{self.remote_field.through.__name__}"
 772        else:
 773            qualified_model_name = self.remote_field.through
 774
 775        errors = []
 776
 777        if self.remote_field.through not in self.meta.models_registry.get_models():
 778            # The relationship model is not installed.
 779            errors.append(
 780                PreflightResult(
 781                    fix=(
 782                        "Field specifies a many-to-many relation through model "
 783                        f"'{qualified_model_name}', which has not been installed. "
 784                        "Ensure the through model is properly defined and installed."
 785                    ),
 786                    obj=self,
 787                    id="fields.m2m_through_model_not_installed",
 788                )
 789            )
 790
 791        else:
 792            assert from_model is not None, (
 793                "ManyToManyField with intermediate "
 794                "tables cannot be checked if you don't pass the model "
 795                "where the field is attached to."
 796            )
 797            # Set some useful local variables
 798            to_model = resolve_relation(from_model, self.remote_field.model)
 799            from_model_name = from_model.model_options.object_name
 800            if isinstance(to_model, str):
 801                to_model_name = to_model
 802            else:
 803                to_model_name = to_model.model_options.object_name
 804            relationship_model_name = (
 805                self.remote_field.through.model_options.object_name
 806            )
 807            self_referential = from_model == to_model
 808            # Count foreign keys in intermediate model
 809            if self_referential:
 810                seen_self = sum(
 811                    from_model == field.remote_field.model
 812                    for field in self.remote_field.through._model_meta.fields
 813                    if isinstance(field, RelatedField)
 814                )
 815
 816                if seen_self > 2 and not self.remote_field.through_fields:
 817                    errors.append(
 818                        PreflightResult(
 819                            fix=(
 820                                "The model is used as an intermediate model by "
 821                                f"'{self}', but it has more than two foreign keys "
 822                                f"to '{from_model_name}', which is ambiguous. "
 823                                "Use through_fields to specify which two foreign keys "
 824                                "Plain should use."
 825                            ),
 826                            obj=self.remote_field.through,
 827                            id="fields.m2m_through_model_ambiguous_fks",
 828                        )
 829                    )
 830
 831            else:
 832                # Count foreign keys in relationship model
 833                seen_from = sum(
 834                    from_model == field.remote_field.model
 835                    for field in self.remote_field.through._model_meta.fields
 836                    if isinstance(field, RelatedField)
 837                )
 838                seen_to = sum(
 839                    to_model == field.remote_field.model
 840                    for field in self.remote_field.through._model_meta.fields
 841                    if isinstance(field, RelatedField)
 842                )
 843
 844                if seen_from > 1 and not self.remote_field.through_fields:
 845                    errors.append(
 846                        PreflightResult(
 847                            fix=(
 848                                "The model is used as an intermediate model by "
 849                                f"'{self}', but it has more than one foreign key "
 850                                f"from '{from_model_name}', which is ambiguous. You must specify "
 851                                "which foreign key Plain should use via the "
 852                                "through_fields keyword argument. "
 853                                "If you want to create a recursive relationship, "
 854                                f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
 855                            ),
 856                            obj=self,
 857                            id="fields.m2m_through_model_invalid_recursive_from",
 858                        )
 859                    )
 860
 861                if seen_to > 1 and not self.remote_field.through_fields:
 862                    errors.append(
 863                        PreflightResult(
 864                            fix=(
 865                                "The model is used as an intermediate model by "
 866                                f"'{self}', but it has more than one foreign key "
 867                                f"to '{to_model_name}', which is ambiguous. You must specify "
 868                                "which foreign key Plain should use via the "
 869                                "through_fields keyword argument. "
 870                                "If you want to create a recursive relationship, "
 871                                f'use ManyToManyField("{RECURSIVE_RELATIONSHIP_CONSTANT}", through="{relationship_model_name}").'
 872                            ),
 873                            obj=self,
 874                            id="fields.m2m_through_model_invalid_recursive_to",
 875                        )
 876                    )
 877
 878                if seen_from == 0 or seen_to == 0:
 879                    errors.append(
 880                        PreflightResult(
 881                            fix=(
 882                                "The model is used as an intermediate model by "
 883                                f"'{self}', but it does not have a foreign key to '{from_model_name}' or '{to_model_name}'. "
 884                                "Add the required foreign keys to the through model."
 885                            ),
 886                            obj=self.remote_field.through,
 887                            id="fields.m2m_through_model_missing_fk",
 888                        )
 889                    )
 890
 891        # Validate `through_fields`.
 892        if self.remote_field.through_fields is not None:
 893            # Validate that we're given an iterable of at least two items
 894            # and that none of them is "falsy".
 895            if not (
 896                len(self.remote_field.through_fields) >= 2
 897                and self.remote_field.through_fields[0]
 898                and self.remote_field.through_fields[1]
 899            ):
 900                errors.append(
 901                    PreflightResult(
 902                        fix=(
 903                            "Field specifies 'through_fields' but does not provide "
 904                            "the names of the two link fields that should be used "
 905                            f"for the relation through model '{qualified_model_name}'. "
 906                            "Make sure you specify 'through_fields' as "
 907                            "through_fields=('field1', 'field2')."
 908                        ),
 909                        obj=self,
 910                        id="fields.m2m_through_fields_wrong_length",
 911                    )
 912                )
 913
 914            # Validate the given through fields -- they should be actual
 915            # fields on the through model, and also be foreign keys to the
 916            # expected models.
 917            else:
 918                assert from_model is not None, (
 919                    "ManyToManyField with intermediate "
 920                    "tables cannot be checked if you don't pass the model "
 921                    "where the field is attached to."
 922                )
 923
 924                source, through, target = (
 925                    from_model,
 926                    self.remote_field.through,
 927                    self.remote_field.model,
 928                )
 929                source_field_name, target_field_name = self.remote_field.through_fields[
 930                    :2
 931                ]
 932
 933                for field_name, related_model in (
 934                    (source_field_name, source),
 935                    (target_field_name, target),
 936                ):
 937                    possible_field_names = []
 938                    for f in through._model_meta.fields:
 939                        if (
 940                            hasattr(f, "remote_field")
 941                            and getattr(f.remote_field, "model", None) == related_model
 942                        ):
 943                            possible_field_names.append(f.name)
 944                    if possible_field_names:
 945                        fix = (
 946                            "Did you mean one of the following foreign keys to '{}': "
 947                            "{}?".format(
 948                                related_model.model_options.object_name,
 949                                ", ".join(possible_field_names),
 950                            )
 951                        )
 952                    else:
 953                        fix = ""
 954
 955                    try:
 956                        field = through._model_meta.get_forward_field(field_name)
 957                    except FieldDoesNotExist:
 958                        errors.append(
 959                            PreflightResult(
 960                                fix=f"The intermediary model '{qualified_model_name}' has no field '{field_name}'. {fix}",
 961                                obj=self,
 962                                id="fields.m2m_through_field_not_found",
 963                            )
 964                        )
 965                    else:
 966                        if not (
 967                            isinstance(field, RelatedField)
 968                            and field.remote_field.model == related_model
 969                        ):
 970                            errors.append(
 971                                PreflightResult(
 972                                    fix=f"'{through.model_options.object_name}.{field_name}' is not a foreign key to '{related_model.model_options.object_name}'. {fix}",
 973                                    obj=self,
 974                                    id="fields.m2m_through_field_not_fk_to_model",
 975                                )
 976                            )
 977
 978        return errors
 979
 980    def _check_table_uniqueness(self, **kwargs: Any) -> list[PreflightResult]:
 981        if isinstance(self.remote_field.through, str):
 982            return []
 983        registered_tables = {
 984            model.model_options.db_table: model
 985            for model in self.meta.models_registry.get_models()
 986            if model != self.remote_field.through
 987        }
 988        m2m_db_table = self.m2m_db_table()
 989        model = registered_tables.get(m2m_db_table)
 990        # Check if there's already a m2m field using the same through model.
 991        if model and model != self.remote_field.through:
 992            clashing_obj = model.model_options.label
 993            return [
 994                PreflightResult(
 995                    fix=(
 996                        f"The field's intermediary table '{m2m_db_table}' clashes with the "
 997                        f"table name of '{clashing_obj}'. "
 998                        "Change the through model's db_table or use a different model."
 999                    ),
1000                    obj=self,
1001                    id="fields.m2m_table_name_clash",
1002                )
1003            ]
1004        return []
1005
1006    def deconstruct(self) -> tuple[str | None, str, list[Any], dict[str, Any]]:
1007        name, path, args, kwargs = super().deconstruct()
1008
1009        if self.remote_field.db_constraint is not True:
1010            kwargs["db_constraint"] = self.remote_field.db_constraint
1011
1012        # Lowercase model names as they should be treated as case-insensitive.
1013        if isinstance(self.remote_field.model, str):
1014            if "." in self.remote_field.model:
1015                package_label, model_name = self.remote_field.model.split(".")
1016                kwargs["to"] = f"{package_label}.{model_name.lower()}"
1017            else:
1018                kwargs["to"] = self.remote_field.model.lower()
1019        else:
1020            kwargs["to"] = self.remote_field.model.model_options.label_lower
1021
1022        if isinstance(self.remote_field.through, str):
1023            kwargs["through"] = self.remote_field.through
1024        else:
1025            kwargs["through"] = self.remote_field.through.model_options.label
1026
1027        return name, path, args, kwargs
1028
1029    def _get_path_info(
1030        self, direct: bool = False, filtered_relation: Any = None
1031    ) -> list[PathInfo]:
1032        """Called by both direct and indirect m2m traversal."""
1033        int_model = self.remote_field.through
1034        # M2M through model fields are always ForeignKey
1035        linkfield1 = cast(
1036            ForeignKeyField,
1037            int_model._model_meta.get_forward_field(self.m2m_field_name()),
1038        )
1039        linkfield2 = cast(
1040            ForeignKeyField,
1041            int_model._model_meta.get_forward_field(self.m2m_reverse_field_name()),
1042        )
1043        if direct:
1044            join1infos = linkfield1.reverse_path_infos
1045            if filtered_relation:
1046                join2infos = linkfield2.get_path_info(filtered_relation)
1047            else:
1048                join2infos = linkfield2.path_infos
1049        else:
1050            join1infos = linkfield2.reverse_path_infos
1051            if filtered_relation:
1052                join2infos = linkfield1.get_path_info(filtered_relation)
1053            else:
1054                join2infos = linkfield1.path_infos
1055
1056        return [*join1infos, *join2infos]
1057
1058    def get_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1059        return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1060
1061    @cached_property
1062    def path_infos(self) -> list[PathInfo]:
1063        return self.get_path_info()
1064
1065    def get_reverse_path_info(self, filtered_relation: Any = None) -> list[PathInfo]:
1066        return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1067
1068    @cached_property
1069    def reverse_path_infos(self) -> list[PathInfo]:
1070        return self.get_reverse_path_info()
1071
1072    def _get_m2m_db_table(self) -> str:
1073        """
1074        Function that can be curried to provide the m2m table name for this
1075        relation.
1076        """
1077        return self.remote_field.through.model_options.db_table
1078
1079    def _get_m2m_attr(self, related: Any, attr: str) -> Any:
1080        """
1081        Function that can be curried to provide the source accessor or DB
1082        column name for the m2m table.
1083        """
1084        cache_attr = f"_m2m_{attr}_cache"
1085        if hasattr(self, cache_attr):
1086            return getattr(self, cache_attr)
1087        if self.remote_field.through_fields is not None:
1088            link_field_name: str | None = self.remote_field.through_fields[0]
1089        else:
1090            link_field_name = None
1091        for f in self.remote_field.through._model_meta.fields:
1092            if (
1093                isinstance(f, RelatedField)
1094                and f.remote_field.model == related.related_model
1095                and (link_field_name is None or link_field_name == f.name)
1096            ):
1097                setattr(self, cache_attr, getattr(f, attr))
1098                return getattr(self, cache_attr)
1099        return None
1100
1101    def _get_m2m_reverse_attr(self, related: Any, attr: str) -> Any:
1102        """
1103        Function that can be curried to provide the related accessor or DB
1104        column name for the m2m table.
1105        """
1106        cache_attr = f"_m2m_reverse_{attr}_cache"
1107        if hasattr(self, cache_attr):
1108            return getattr(self, cache_attr)
1109        found = False
1110        if self.remote_field.through_fields is not None:
1111            link_field_name: str | None = self.remote_field.through_fields[1]
1112        else:
1113            link_field_name = None
1114        for f in self.remote_field.through._model_meta.fields:
1115            if isinstance(f, RelatedField) and f.remote_field.model == related.model:
1116                if link_field_name is None and related.related_model == related.model:
1117                    # If this is an m2m-intermediate to self,
1118                    # the first foreign key you find will be
1119                    # the source column. Keep searching for
1120                    # the second foreign key.
1121                    if found:
1122                        setattr(self, cache_attr, getattr(f, attr))
1123                        break
1124                    else:
1125                        found = True
1126                elif link_field_name is None or link_field_name == f.name:
1127                    setattr(self, cache_attr, getattr(f, attr))
1128                    break
1129        return getattr(self, cache_attr)
1130
1131    def contribute_to_class(self, cls: type[Model], name: str) -> None:
1132        super().contribute_to_class(cls, name)
1133
1134        def resolve_through_model(
1135            _: Any, model: type[Model], field: ManyToManyField
1136        ) -> None:
1137            field.remote_field.through = model
1138
1139        lazy_related_operation(
1140            resolve_through_model,
1141            cls,
1142            self.remote_field.through,
1143            field=self,
1144        )
1145
1146        # Add the descriptor for the m2m relation.
1147        setattr(cls, self.name, ForwardManyToManyDescriptor(self.remote_field))  # type: ignore[arg-type]
1148
1149        # Set up the accessor for the m2m table name for the relation.
1150        self.m2m_db_table = self._get_m2m_db_table
1151
1152    def do_related_class(self, other: type[Model], cls: type[Model]) -> None:
1153        """Set up M2M metadata accessors for the through table."""
1154        super().do_related_class(other, cls)
1155
1156        # Set up the accessors for the column names on the m2m table.
1157        # These are used during query construction and schema operations.
1158        related = self.remote_field
1159        self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1160        self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1161
1162        self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1163        self.m2m_reverse_field_name = partial(
1164            self._get_m2m_reverse_attr, related, "name"
1165        )
1166
1167        get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1168        self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1169        get_m2m_reverse_rel = partial(
1170            self._get_m2m_reverse_attr, related, "remote_field"
1171        )
1172        self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1173
1174    def set_attributes_from_rel(self) -> None:
1175        pass
1176
1177    def value_from_object(self, obj: Model) -> list[Any]:
1178        return [] if obj.id is None else list(getattr(obj, self.attname).all())
1179
1180    def save_form_data(self, instance: Model, data: Any) -> None:
1181        getattr(instance, self.attname).set(data)
1182
1183    def db_check(self) -> None:
1184        return None
1185
1186    def db_type(self) -> None:
1187        # A ManyToManyField is not represented by a single column,
1188        # so return None.
1189        return None
1190
1191    def db_parameters(self) -> DbParameters:
1192        return {"type": None, "check": None}