Plain is headed towards 1.0! Subscribe for development updates →

   1import functools
   2import inspect
   3from functools import partial
   4
   5from plain import exceptions, preflight
   6from plain.models.backends import utils
   7from plain.models.constants import LOOKUP_SEP
   8from plain.models.db import connection, router
   9from plain.models.deletion import CASCADE, SET_DEFAULT, SET_NULL
  10from plain.models.query_utils import PathInfo, Q
  11from plain.models.utils import make_model_tuple
  12from plain.packages import packages
  13from plain.runtime import settings
  14from plain.runtime.user_settings import SettingsReference
  15from plain.utils.functional import cached_property
  16
  17from . import Field
  18from .mixins import FieldCacheMixin
  19from .related_descriptors import (
  20    ForeignKeyDeferredAttribute,
  21    ForwardManyToOneDescriptor,
  22    ForwardOneToOneDescriptor,
  23    ManyToManyDescriptor,
  24    ReverseManyToOneDescriptor,
  25    ReverseOneToOneDescriptor,
  26)
  27from .related_lookups import (
  28    RelatedExact,
  29    RelatedGreaterThan,
  30    RelatedGreaterThanOrEqual,
  31    RelatedIn,
  32    RelatedIsNull,
  33    RelatedLessThan,
  34    RelatedLessThanOrEqual,
  35)
  36from .reverse_related import ForeignObjectRel, ManyToManyRel, ManyToOneRel, OneToOneRel
  37
  38RECURSIVE_RELATIONSHIP_CONSTANT = "self"
  39
  40
  41def resolve_relation(scope_model, relation):
  42    """
  43    Transform relation into a model or fully-qualified model string of the form
  44    "package_label.ModelName", relative to scope_model.
  45
  46    The relation argument can be:
  47      * RECURSIVE_RELATIONSHIP_CONSTANT, i.e. the string "self", in which case
  48        the model argument will be returned.
  49      * A bare model name without an package_label, in which case scope_model's
  50        package_label will be prepended.
  51      * An "package_label.ModelName" string.
  52      * A model class, which will be returned unchanged.
  53    """
  54    # Check for recursive relations
  55    if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
  56        relation = scope_model
  57
  58    # Look for an "app.Model" relation
  59    if isinstance(relation, str):
  60        if "." not in relation:
  61            relation = f"{scope_model._meta.package_label}.{relation}"
  62
  63    return relation
  64
  65
  66def lazy_related_operation(function, model, *related_models, **kwargs):
  67    """
  68    Schedule `function` to be called once `model` and all `related_models`
  69    have been imported and registered with the app registry. `function` will
  70    be called with the newly-loaded model classes as its positional arguments,
  71    plus any optional keyword arguments.
  72
  73    The `model` argument must be a model class. Each subsequent positional
  74    argument is another model, or a reference to another model - see
  75    `resolve_relation()` for the various forms these may take. Any relative
  76    references will be resolved relative to `model`.
  77
  78    This is a convenience wrapper for `Packages.lazy_model_operation` - the app
  79    registry model used is the one found in `model._meta.packages`.
  80    """
  81    models = [model] + [resolve_relation(model, rel) for rel in related_models]
  82    model_keys = (make_model_tuple(m) for m in models)
  83    packages = model._meta.packages
  84    return packages.lazy_model_operation(partial(function, **kwargs), *model_keys)
  85
  86
  87class RelatedField(FieldCacheMixin, Field):
  88    """Base class that all relational fields inherit from."""
  89
  90    # Field flags
  91    one_to_many = False
  92    one_to_one = False
  93    many_to_many = False
  94    many_to_one = False
  95
  96    def __init__(
  97        self,
  98        related_name=None,
  99        related_query_name=None,
 100        limit_choices_to=None,
 101        **kwargs,
 102    ):
 103        self._related_name = related_name
 104        self._related_query_name = related_query_name
 105        self._limit_choices_to = limit_choices_to
 106        super().__init__(**kwargs)
 107
 108    @cached_property
 109    def related_model(self):
 110        # Can't cache this property until all the models are loaded.
 111        packages.check_models_ready()
 112        return self.remote_field.model
 113
 114    def check(self, **kwargs):
 115        return [
 116            *super().check(**kwargs),
 117            *self._check_related_name_is_valid(),
 118            *self._check_related_query_name_is_valid(),
 119            *self._check_relation_model_exists(),
 120            *self._check_referencing_to_swapped_model(),
 121            *self._check_clashes(),
 122        ]
 123
 124    def _check_related_name_is_valid(self):
 125        import keyword
 126
 127        related_name = self.remote_field.related_name
 128        if related_name is None:
 129            return []
 130        is_valid_id = (
 131            not keyword.iskeyword(related_name) and related_name.isidentifier()
 132        )
 133        if not (is_valid_id or related_name.endswith("+")):
 134            return [
 135                preflight.Error(
 136                    "The name '{}' is invalid related_name for field {}.{}".format(
 137                        self.remote_field.related_name,
 138                        self.model._meta.object_name,
 139                        self.name,
 140                    ),
 141                    hint=(
 142                        "Related name must be a valid Python identifier or end with a "
 143                        "'+'"
 144                    ),
 145                    obj=self,
 146                    id="fields.E306",
 147                )
 148            ]
 149        return []
 150
 151    def _check_related_query_name_is_valid(self):
 152        if self.remote_field.is_hidden():
 153            return []
 154        rel_query_name = self.related_query_name()
 155        errors = []
 156        if rel_query_name.endswith("_"):
 157            errors.append(
 158                preflight.Error(
 159                    "Reverse query name '%s' must not end with an underscore."
 160                    % rel_query_name,
 161                    hint=(
 162                        "Add or change a related_name or related_query_name "
 163                        "argument for this field."
 164                    ),
 165                    obj=self,
 166                    id="fields.E308",
 167                )
 168            )
 169        if LOOKUP_SEP in rel_query_name:
 170            errors.append(
 171                preflight.Error(
 172                    "Reverse query name '{}' must not contain '{}'.".format(
 173                        rel_query_name, LOOKUP_SEP
 174                    ),
 175                    hint=(
 176                        "Add or change a related_name or related_query_name "
 177                        "argument for this field."
 178                    ),
 179                    obj=self,
 180                    id="fields.E309",
 181                )
 182            )
 183        return errors
 184
 185    def _check_relation_model_exists(self):
 186        rel_is_missing = self.remote_field.model not in self.opts.packages.get_models()
 187        rel_is_string = isinstance(self.remote_field.model, str)
 188        model_name = (
 189            self.remote_field.model
 190            if rel_is_string
 191            else self.remote_field.model._meta.object_name
 192        )
 193        if rel_is_missing and (
 194            rel_is_string or not self.remote_field.model._meta.swapped
 195        ):
 196            return [
 197                preflight.Error(
 198                    "Field defines a relation with model '%s', which is either "
 199                    "not installed, or is abstract." % model_name,
 200                    obj=self,
 201                    id="fields.E300",
 202                )
 203            ]
 204        return []
 205
 206    def _check_referencing_to_swapped_model(self):
 207        if (
 208            self.remote_field.model not in self.opts.packages.get_models()
 209            and not isinstance(self.remote_field.model, str)
 210            and self.remote_field.model._meta.swapped
 211        ):
 212            return [
 213                preflight.Error(
 214                    "Field defines a relation with the model '%s', which has "
 215                    "been swapped out." % self.remote_field.model._meta.label,
 216                    hint="Update the relation to point at 'settings.%s'."
 217                    % self.remote_field.model._meta.swappable,
 218                    obj=self,
 219                    id="fields.E301",
 220                )
 221            ]
 222        return []
 223
 224    def _check_clashes(self):
 225        """Check accessor and reverse query name clashes."""
 226        from plain.models.base import ModelBase
 227
 228        errors = []
 229        opts = self.model._meta
 230
 231        # f.remote_field.model may be a string instead of a model. Skip if
 232        # model name is not resolved.
 233        if not isinstance(self.remote_field.model, ModelBase):
 234            return []
 235
 236        # Consider that we are checking field `Model.foreign` and the models
 237        # are:
 238        #
 239        #     class Target(models.Model):
 240        #         model = models.IntegerField()
 241        #         model_set = models.IntegerField()
 242        #
 243        #     class Model(models.Model):
 244        #         foreign = models.ForeignKey(Target)
 245        #         m2m = models.ManyToManyField(Target)
 246
 247        # rel_opts.object_name == "Target"
 248        rel_opts = self.remote_field.model._meta
 249        # If the field doesn't install a backward relation on the target model
 250        # (so `is_hidden` returns True), then there are no clashes to check
 251        # and we can skip these fields.
 252        rel_is_hidden = self.remote_field.is_hidden()
 253        rel_name = self.remote_field.get_accessor_name()  # i. e. "model_set"
 254        rel_query_name = self.related_query_name()  # i. e. "model"
 255        # i.e. "package_label.Model.field".
 256        field_name = f"{opts.label}.{self.name}"
 257
 258        # Check clashes between accessor or reverse query name of `field`
 259        # and any other field name -- i.e. accessor for Model.foreign is
 260        # model_set and it clashes with Target.model_set.
 261        potential_clashes = rel_opts.fields + rel_opts.many_to_many
 262        for clash_field in potential_clashes:
 263            # i.e. "package_label.Target.model_set".
 264            clash_name = f"{rel_opts.label}.{clash_field.name}"
 265            if not rel_is_hidden and clash_field.name == rel_name:
 266                errors.append(
 267                    preflight.Error(
 268                        f"Reverse accessor '{rel_opts.object_name}.{rel_name}' "
 269                        f"for '{field_name}' clashes with field name "
 270                        f"'{clash_name}'.",
 271                        hint=(
 272                            "Rename field '{}', or add/change a related_name "
 273                            "argument to the definition for field '{}'."
 274                        ).format(clash_name, field_name),
 275                        obj=self,
 276                        id="fields.E302",
 277                    )
 278                )
 279
 280            if clash_field.name == rel_query_name:
 281                errors.append(
 282                    preflight.Error(
 283                        "Reverse query name for '{}' clashes with field name '{}'.".format(
 284                            field_name, clash_name
 285                        ),
 286                        hint=(
 287                            "Rename field '{}', or add/change a related_name "
 288                            "argument to the definition for field '{}'."
 289                        ).format(clash_name, field_name),
 290                        obj=self,
 291                        id="fields.E303",
 292                    )
 293                )
 294
 295        # Check clashes between accessors/reverse query names of `field` and
 296        # any other field accessor -- i. e. Model.foreign accessor clashes with
 297        # Model.m2m accessor.
 298        potential_clashes = (r for r in rel_opts.related_objects if r.field is not self)
 299        for clash_field in potential_clashes:
 300            # i.e. "package_label.Model.m2m".
 301            clash_name = "{}.{}".format(
 302                clash_field.related_model._meta.label,
 303                clash_field.field.name,
 304            )
 305            if not rel_is_hidden and clash_field.get_accessor_name() == rel_name:
 306                errors.append(
 307                    preflight.Error(
 308                        f"Reverse accessor '{rel_opts.object_name}.{rel_name}' "
 309                        f"for '{field_name}' clashes with reverse accessor for "
 310                        f"'{clash_name}'.",
 311                        hint=(
 312                            "Add or change a related_name argument "
 313                            f"to the definition for '{field_name}' or '{clash_name}'."
 314                        ),
 315                        obj=self,
 316                        id="fields.E304",
 317                    )
 318                )
 319
 320            if clash_field.get_accessor_name() == rel_query_name:
 321                errors.append(
 322                    preflight.Error(
 323                        "Reverse query name for '{}' clashes with reverse query name "
 324                        "for '{}'.".format(field_name, clash_name),
 325                        hint=(
 326                            "Add or change a related_name argument "
 327                            f"to the definition for '{field_name}' or '{clash_name}'."
 328                        ),
 329                        obj=self,
 330                        id="fields.E305",
 331                    )
 332                )
 333
 334        return errors
 335
 336    def db_type(self, connection):
 337        # By default related field will not have a column as it relates to
 338        # columns from another table.
 339        return None
 340
 341    def contribute_to_class(self, cls, name, private_only=False, **kwargs):
 342        super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
 343
 344        self.opts = cls._meta
 345
 346        if not cls._meta.abstract:
 347            if self.remote_field.related_name:
 348                related_name = self.remote_field.related_name
 349            else:
 350                related_name = self.opts.default_related_name
 351            if related_name:
 352                related_name %= {
 353                    "class": cls.__name__.lower(),
 354                    "model_name": cls._meta.model_name.lower(),
 355                    "package_label": cls._meta.package_label.lower(),
 356                }
 357                self.remote_field.related_name = related_name
 358
 359            if self.remote_field.related_query_name:
 360                related_query_name = self.remote_field.related_query_name % {
 361                    "class": cls.__name__.lower(),
 362                    "package_label": cls._meta.package_label.lower(),
 363                }
 364                self.remote_field.related_query_name = related_query_name
 365
 366            def resolve_related_class(model, related, field):
 367                field.remote_field.model = related
 368                field.do_related_class(related, model)
 369
 370            lazy_related_operation(
 371                resolve_related_class, cls, self.remote_field.model, field=self
 372            )
 373
 374    def deconstruct(self):
 375        name, path, args, kwargs = super().deconstruct()
 376        if self._limit_choices_to:
 377            kwargs["limit_choices_to"] = self._limit_choices_to
 378        if self._related_name is not None:
 379            kwargs["related_name"] = self._related_name
 380        if self._related_query_name is not None:
 381            kwargs["related_query_name"] = self._related_query_name
 382        return name, path, args, kwargs
 383
 384    def get_forward_related_filter(self, obj):
 385        """
 386        Return the keyword arguments that when supplied to
 387        self.model.object.filter(), would select all instances related through
 388        this field to the remote obj. This is used to build the querysets
 389        returned by related descriptors. obj is an instance of
 390        self.related_field.model.
 391        """
 392        return {
 393            f"{self.name}__{rh_field.name}": getattr(obj, rh_field.attname)
 394            for _, rh_field in self.related_fields
 395        }
 396
 397    def get_reverse_related_filter(self, obj):
 398        """
 399        Complement to get_forward_related_filter(). Return the keyword
 400        arguments that when passed to self.related_field.model.object.filter()
 401        select all instances of self.related_field.model related through
 402        this field to obj. obj is an instance of self.model.
 403        """
 404        base_q = Q.create(
 405            [
 406                (rh_field.attname, getattr(obj, lh_field.attname))
 407                for lh_field, rh_field in self.related_fields
 408            ]
 409        )
 410        descriptor_filter = self.get_extra_descriptor_filter(obj)
 411        if isinstance(descriptor_filter, dict):
 412            return base_q & Q(**descriptor_filter)
 413        elif descriptor_filter:
 414            return base_q & descriptor_filter
 415        return base_q
 416
 417    @property
 418    def swappable_setting(self):
 419        """
 420        Get the setting that this is powered from for swapping, or None
 421        if it's not swapped in / marked with swappable=False.
 422        """
 423        if self.swappable:
 424            # Work out string form of "to"
 425            if isinstance(self.remote_field.model, str):
 426                to_string = self.remote_field.model
 427            else:
 428                to_string = self.remote_field.model._meta.label
 429            return packages.get_swappable_settings_name(to_string)
 430        return None
 431
 432    def set_attributes_from_rel(self):
 433        self.name = self.name or (
 434            self.remote_field.model._meta.model_name
 435            + "_"
 436            + self.remote_field.model._meta.pk.name
 437        )
 438        self.remote_field.set_field_name()
 439
 440    def do_related_class(self, other, cls):
 441        self.set_attributes_from_rel()
 442        self.contribute_to_related_class(other, self.remote_field)
 443
 444    def get_limit_choices_to(self):
 445        """
 446        Return ``limit_choices_to`` for this model field.
 447
 448        If it is a callable, it will be invoked and the result will be
 449        returned.
 450        """
 451        if callable(self.remote_field.limit_choices_to):
 452            return self.remote_field.limit_choices_to()
 453        return self.remote_field.limit_choices_to
 454
 455    def related_query_name(self):
 456        """
 457        Define the name that can be used to identify this related object in a
 458        table-spanning query.
 459        """
 460        return (
 461            self.remote_field.related_query_name
 462            or self.remote_field.related_name
 463            or self.opts.model_name
 464        )
 465
 466    @property
 467    def target_field(self):
 468        """
 469        When filtering against this relation, return the field on the remote
 470        model against which the filtering should happen.
 471        """
 472        target_fields = self.path_infos[-1].target_fields
 473        if len(target_fields) > 1:
 474            raise exceptions.FieldError(
 475                "The relation has multiple target fields, but only single target field "
 476                "was asked for"
 477            )
 478        return target_fields[0]
 479
 480    def get_cache_name(self):
 481        return self.name
 482
 483
 484class ForeignObject(RelatedField):
 485    """
 486    Abstraction of the ForeignKey relation to support multi-column relations.
 487    """
 488
 489    # Field flags
 490    many_to_many = False
 491    many_to_one = True
 492    one_to_many = False
 493    one_to_one = False
 494
 495    requires_unique_target = True
 496    related_accessor_class = ReverseManyToOneDescriptor
 497    forward_related_accessor_class = ForwardManyToOneDescriptor
 498    rel_class = ForeignObjectRel
 499
 500    def __init__(
 501        self,
 502        to,
 503        on_delete,
 504        from_fields,
 505        to_fields,
 506        rel=None,
 507        related_name=None,
 508        related_query_name=None,
 509        limit_choices_to=None,
 510        parent_link=False,
 511        swappable=True,
 512        **kwargs,
 513    ):
 514        if rel is None:
 515            rel = self.rel_class(
 516                self,
 517                to,
 518                related_name=related_name,
 519                related_query_name=related_query_name,
 520                limit_choices_to=limit_choices_to,
 521                parent_link=parent_link,
 522                on_delete=on_delete,
 523            )
 524
 525        super().__init__(
 526            rel=rel,
 527            related_name=related_name,
 528            related_query_name=related_query_name,
 529            limit_choices_to=limit_choices_to,
 530            **kwargs,
 531        )
 532
 533        self.from_fields = from_fields
 534        self.to_fields = to_fields
 535        self.swappable = swappable
 536
 537    def __copy__(self):
 538        obj = super().__copy__()
 539        # Remove any cached PathInfo values.
 540        obj.__dict__.pop("path_infos", None)
 541        obj.__dict__.pop("reverse_path_infos", None)
 542        return obj
 543
 544    def check(self, **kwargs):
 545        return [
 546            *super().check(**kwargs),
 547            *self._check_to_fields_exist(),
 548            *self._check_unique_target(),
 549        ]
 550
 551    def _check_to_fields_exist(self):
 552        # Skip nonexistent models.
 553        if isinstance(self.remote_field.model, str):
 554            return []
 555
 556        errors = []
 557        for to_field in self.to_fields:
 558            if to_field:
 559                try:
 560                    self.remote_field.model._meta.get_field(to_field)
 561                except exceptions.FieldDoesNotExist:
 562                    errors.append(
 563                        preflight.Error(
 564                            f"The to_field '{to_field}' doesn't exist on the related "
 565                            f"model '{self.remote_field.model._meta.label}'.",
 566                            obj=self,
 567                            id="fields.E312",
 568                        )
 569                    )
 570        return errors
 571
 572    def _check_unique_target(self):
 573        rel_is_string = isinstance(self.remote_field.model, str)
 574        if rel_is_string or not self.requires_unique_target:
 575            return []
 576
 577        try:
 578            self.foreign_related_fields
 579        except exceptions.FieldDoesNotExist:
 580            return []
 581
 582        if not self.foreign_related_fields:
 583            return []
 584
 585        unique_foreign_fields = {
 586            frozenset([f.name])
 587            for f in self.remote_field.model._meta.get_fields()
 588            if getattr(f, "unique", False)
 589        }
 590        unique_foreign_fields.update(
 591            {
 592                frozenset(uc.fields)
 593                for uc in self.remote_field.model._meta.total_unique_constraints
 594            }
 595        )
 596        foreign_fields = {f.name for f in self.foreign_related_fields}
 597        has_unique_constraint = any(u <= foreign_fields for u in unique_foreign_fields)
 598
 599        if not has_unique_constraint and len(self.foreign_related_fields) > 1:
 600            field_combination = ", ".join(
 601                "'%s'" % rel_field.name for rel_field in self.foreign_related_fields
 602            )
 603            model_name = self.remote_field.model.__name__
 604            return [
 605                preflight.Error(
 606                    "No subset of the fields {} on model '{}' is unique.".format(
 607                        field_combination, model_name
 608                    ),
 609                    hint=(
 610                        "Mark a single field as unique=True or add a set of "
 611                        "fields to a unique constraint (via "
 612                        "a UniqueConstraint (without condition) in the "
 613                        "model Meta.constraints)."
 614                    ),
 615                    obj=self,
 616                    id="fields.E310",
 617                )
 618            ]
 619        elif not has_unique_constraint:
 620            field_name = self.foreign_related_fields[0].name
 621            model_name = self.remote_field.model.__name__
 622            return [
 623                preflight.Error(
 624                    "'{}.{}' must be unique because it is referenced by "
 625                    "a foreign key.".format(model_name, field_name),
 626                    hint=(
 627                        "Add unique=True to this field or add a "
 628                        "UniqueConstraint (without condition) in the model "
 629                        "Meta.constraints."
 630                    ),
 631                    obj=self,
 632                    id="fields.E311",
 633                )
 634            ]
 635        else:
 636            return []
 637
 638    def deconstruct(self):
 639        name, path, args, kwargs = super().deconstruct()
 640        kwargs["on_delete"] = self.remote_field.on_delete
 641        kwargs["from_fields"] = self.from_fields
 642        kwargs["to_fields"] = self.to_fields
 643
 644        if self.remote_field.parent_link:
 645            kwargs["parent_link"] = self.remote_field.parent_link
 646        if isinstance(self.remote_field.model, str):
 647            if "." in self.remote_field.model:
 648                package_label, model_name = self.remote_field.model.split(".")
 649                kwargs["to"] = f"{package_label}.{model_name.lower()}"
 650            else:
 651                kwargs["to"] = self.remote_field.model.lower()
 652        else:
 653            kwargs["to"] = self.remote_field.model._meta.label_lower
 654        # If swappable is True, then see if we're actually pointing to the target
 655        # of a swap.
 656        swappable_setting = self.swappable_setting
 657        if swappable_setting is not None:
 658            # If it's already a settings reference, error
 659            if hasattr(kwargs["to"], "setting_name"):
 660                if kwargs["to"].setting_name != swappable_setting:
 661                    raise ValueError(
 662                        "Cannot deconstruct a ForeignKey pointing to a model "
 663                        "that is swapped in place of more than one model ({} and {})".format(
 664                            kwargs["to"].setting_name, swappable_setting
 665                        )
 666                    )
 667            # Set it
 668            kwargs["to"] = SettingsReference(
 669                kwargs["to"],
 670                swappable_setting,
 671            )
 672        return name, path, args, kwargs
 673
 674    def resolve_related_fields(self):
 675        if not self.from_fields or len(self.from_fields) != len(self.to_fields):
 676            raise ValueError(
 677                "Foreign Object from and to fields must be the same non-zero length"
 678            )
 679        if isinstance(self.remote_field.model, str):
 680            raise ValueError(
 681                "Related model %r cannot be resolved" % self.remote_field.model
 682            )
 683        related_fields = []
 684        for index in range(len(self.from_fields)):
 685            from_field_name = self.from_fields[index]
 686            to_field_name = self.to_fields[index]
 687            from_field = (
 688                self
 689                if from_field_name == RECURSIVE_RELATIONSHIP_CONSTANT
 690                else self.opts.get_field(from_field_name)
 691            )
 692            to_field = (
 693                self.remote_field.model._meta.pk
 694                if to_field_name is None
 695                else self.remote_field.model._meta.get_field(to_field_name)
 696            )
 697            related_fields.append((from_field, to_field))
 698        return related_fields
 699
 700    @cached_property
 701    def related_fields(self):
 702        return self.resolve_related_fields()
 703
 704    @cached_property
 705    def reverse_related_fields(self):
 706        return [(rhs_field, lhs_field) for lhs_field, rhs_field in self.related_fields]
 707
 708    @cached_property
 709    def local_related_fields(self):
 710        return tuple(lhs_field for lhs_field, rhs_field in self.related_fields)
 711
 712    @cached_property
 713    def foreign_related_fields(self):
 714        return tuple(
 715            rhs_field for lhs_field, rhs_field in self.related_fields if rhs_field
 716        )
 717
 718    def get_local_related_value(self, instance):
 719        return self.get_instance_value_for_fields(instance, self.local_related_fields)
 720
 721    def get_foreign_related_value(self, instance):
 722        return self.get_instance_value_for_fields(instance, self.foreign_related_fields)
 723
 724    @staticmethod
 725    def get_instance_value_for_fields(instance, fields):
 726        ret = []
 727        opts = instance._meta
 728        for field in fields:
 729            # Gotcha: in some cases (like fixture loading) a model can have
 730            # different values in parent_ptr_id and parent's id. So, use
 731            # instance.pk (that is, parent_ptr_id) when asked for instance.id.
 732            if field.primary_key:
 733                possible_parent_link = opts.get_ancestor_link(field.model)
 734                if (
 735                    not possible_parent_link
 736                    or possible_parent_link.primary_key
 737                    or possible_parent_link.model._meta.abstract
 738                ):
 739                    ret.append(instance.pk)
 740                    continue
 741            ret.append(getattr(instance, field.attname))
 742        return tuple(ret)
 743
 744    def get_attname_column(self):
 745        attname, column = super().get_attname_column()
 746        return attname, None
 747
 748    def get_joining_columns(self, reverse_join=False):
 749        source = self.reverse_related_fields if reverse_join else self.related_fields
 750        return tuple(
 751            (lhs_field.column, rhs_field.column) for lhs_field, rhs_field in source
 752        )
 753
 754    def get_reverse_joining_columns(self):
 755        return self.get_joining_columns(reverse_join=True)
 756
 757    def get_extra_descriptor_filter(self, instance):
 758        """
 759        Return an extra filter condition for related object fetching when
 760        user does 'instance.fieldname', that is the extra filter is used in
 761        the descriptor of the field.
 762
 763        The filter should be either a dict usable in .filter(**kwargs) call or
 764        a Q-object. The condition will be ANDed together with the relation's
 765        joining columns.
 766
 767        A parallel method is get_extra_restriction() which is used in
 768        JOIN and subquery conditions.
 769        """
 770        return {}
 771
 772    def get_extra_restriction(self, alias, related_alias):
 773        """
 774        Return a pair condition used for joining and subquery pushdown. The
 775        condition is something that responds to as_sql(compiler, connection)
 776        method.
 777
 778        Note that currently referring both the 'alias' and 'related_alias'
 779        will not work in some conditions, like subquery pushdown.
 780
 781        A parallel method is get_extra_descriptor_filter() which is used in
 782        instance.fieldname related object fetching.
 783        """
 784        return None
 785
 786    def get_path_info(self, filtered_relation=None):
 787        """Get path from this field to the related model."""
 788        opts = self.remote_field.model._meta
 789        from_opts = self.model._meta
 790        return [
 791            PathInfo(
 792                from_opts=from_opts,
 793                to_opts=opts,
 794                target_fields=self.foreign_related_fields,
 795                join_field=self,
 796                m2m=False,
 797                direct=True,
 798                filtered_relation=filtered_relation,
 799            )
 800        ]
 801
 802    @cached_property
 803    def path_infos(self):
 804        return self.get_path_info()
 805
 806    def get_reverse_path_info(self, filtered_relation=None):
 807        """Get path from the related model to this field's model."""
 808        opts = self.model._meta
 809        from_opts = self.remote_field.model._meta
 810        return [
 811            PathInfo(
 812                from_opts=from_opts,
 813                to_opts=opts,
 814                target_fields=(opts.pk,),
 815                join_field=self.remote_field,
 816                m2m=not self.unique,
 817                direct=False,
 818                filtered_relation=filtered_relation,
 819            )
 820        ]
 821
 822    @cached_property
 823    def reverse_path_infos(self):
 824        return self.get_reverse_path_info()
 825
 826    @classmethod
 827    @functools.cache
 828    def get_class_lookups(cls):
 829        bases = inspect.getmro(cls)
 830        bases = bases[: bases.index(ForeignObject) + 1]
 831        class_lookups = [parent.__dict__.get("class_lookups", {}) for parent in bases]
 832        return cls.merge_dicts(class_lookups)
 833
 834    def contribute_to_class(self, cls, name, private_only=False, **kwargs):
 835        super().contribute_to_class(cls, name, private_only=private_only, **kwargs)
 836        setattr(cls, self.name, self.forward_related_accessor_class(self))
 837
 838    def contribute_to_related_class(self, cls, related):
 839        # Internal FK's - i.e., those with a related name ending with '+' -
 840        # and swapped models don't get a related descriptor.
 841        if (
 842            not self.remote_field.is_hidden()
 843            and not related.related_model._meta.swapped
 844        ):
 845            setattr(
 846                cls._meta.concrete_model,
 847                related.get_accessor_name(),
 848                self.related_accessor_class(related),
 849            )
 850            # While 'limit_choices_to' might be a callable, simply pass
 851            # it along for later - this is too early because it's still
 852            # model load time.
 853            if self.remote_field.limit_choices_to:
 854                cls._meta.related_fkey_lookups.append(
 855                    self.remote_field.limit_choices_to
 856                )
 857
 858
 859ForeignObject.register_lookup(RelatedIn)
 860ForeignObject.register_lookup(RelatedExact)
 861ForeignObject.register_lookup(RelatedLessThan)
 862ForeignObject.register_lookup(RelatedGreaterThan)
 863ForeignObject.register_lookup(RelatedGreaterThanOrEqual)
 864ForeignObject.register_lookup(RelatedLessThanOrEqual)
 865ForeignObject.register_lookup(RelatedIsNull)
 866
 867
 868class ForeignKey(ForeignObject):
 869    """
 870    Provide a many-to-one relation by adding a column to the local model
 871    to hold the remote value.
 872
 873    By default ForeignKey will target the pk of the remote model but this
 874    behavior can be changed by using the ``to_field`` argument.
 875    """
 876
 877    descriptor_class = ForeignKeyDeferredAttribute
 878    # Field flags
 879    many_to_many = False
 880    many_to_one = True
 881    one_to_many = False
 882    one_to_one = False
 883
 884    rel_class = ManyToOneRel
 885
 886    empty_strings_allowed = False
 887    default_error_messages = {
 888        "invalid": "%(model)s instance with %(field)s %(value)r does not exist."
 889    }
 890    description = "Foreign Key (type determined by related field)"
 891
 892    def __init__(
 893        self,
 894        to,
 895        on_delete,
 896        related_name=None,
 897        related_query_name=None,
 898        limit_choices_to=None,
 899        parent_link=False,
 900        to_field=None,
 901        db_constraint=True,
 902        **kwargs,
 903    ):
 904        try:
 905            to._meta.model_name
 906        except AttributeError:
 907            if not isinstance(to, str):
 908                raise TypeError(
 909                    "{}({!r}) is invalid. First parameter to ForeignKey must be "
 910                    "either a model, a model name, or the string {!r}".format(
 911                        self.__class__.__name__,
 912                        to,
 913                        RECURSIVE_RELATIONSHIP_CONSTANT,
 914                    )
 915                )
 916        else:
 917            # For backwards compatibility purposes, we need to *try* and set
 918            # the to_field during FK construction. It won't be guaranteed to
 919            # be correct until contribute_to_class is called. Refs #12190.
 920            to_field = to_field or (to._meta.pk and to._meta.pk.name)
 921        if not callable(on_delete):
 922            raise TypeError("on_delete must be callable.")
 923
 924        kwargs["rel"] = self.rel_class(
 925            self,
 926            to,
 927            to_field,
 928            related_name=related_name,
 929            related_query_name=related_query_name,
 930            limit_choices_to=limit_choices_to,
 931            parent_link=parent_link,
 932            on_delete=on_delete,
 933        )
 934        kwargs.setdefault("db_index", True)
 935
 936        super().__init__(
 937            to,
 938            on_delete,
 939            related_name=related_name,
 940            related_query_name=related_query_name,
 941            limit_choices_to=limit_choices_to,
 942            from_fields=[RECURSIVE_RELATIONSHIP_CONSTANT],
 943            to_fields=[to_field],
 944            **kwargs,
 945        )
 946        self.db_constraint = db_constraint
 947
 948    def __class_getitem__(cls, *args, **kwargs):
 949        return cls
 950
 951    def check(self, **kwargs):
 952        return [
 953            *super().check(**kwargs),
 954            *self._check_on_delete(),
 955            *self._check_unique(),
 956        ]
 957
 958    def _check_on_delete(self):
 959        on_delete = getattr(self.remote_field, "on_delete", None)
 960        if on_delete == SET_NULL and not self.null:
 961            return [
 962                preflight.Error(
 963                    "Field specifies on_delete=SET_NULL, but cannot be null.",
 964                    hint=(
 965                        "Set null=True argument on the field, or change the on_delete "
 966                        "rule."
 967                    ),
 968                    obj=self,
 969                    id="fields.E320",
 970                )
 971            ]
 972        elif on_delete == SET_DEFAULT and not self.has_default():
 973            return [
 974                preflight.Error(
 975                    "Field specifies on_delete=SET_DEFAULT, but has no default value.",
 976                    hint="Set a default value, or change the on_delete rule.",
 977                    obj=self,
 978                    id="fields.E321",
 979                )
 980            ]
 981        else:
 982            return []
 983
 984    def _check_unique(self, **kwargs):
 985        return (
 986            [
 987                preflight.Warning(
 988                    "Setting unique=True on a ForeignKey has the same effect as using "
 989                    "a OneToOneField.",
 990                    hint=(
 991                        "ForeignKey(unique=True) is usually better served by a "
 992                        "OneToOneField."
 993                    ),
 994                    obj=self,
 995                    id="fields.W342",
 996                )
 997            ]
 998            if self.unique
 999            else []
1000        )
1001
1002    def deconstruct(self):
1003        name, path, args, kwargs = super().deconstruct()
1004        del kwargs["to_fields"]
1005        del kwargs["from_fields"]
1006        # Handle the simpler arguments
1007        if self.db_index:
1008            del kwargs["db_index"]
1009        else:
1010            kwargs["db_index"] = False
1011        if self.db_constraint is not True:
1012            kwargs["db_constraint"] = self.db_constraint
1013        # Rel needs more work.
1014        to_meta = getattr(self.remote_field.model, "_meta", None)
1015        if self.remote_field.field_name and (
1016            not to_meta
1017            or (to_meta.pk and self.remote_field.field_name != to_meta.pk.name)
1018        ):
1019            kwargs["to_field"] = self.remote_field.field_name
1020        return name, path, args, kwargs
1021
1022    def to_python(self, value):
1023        return self.target_field.to_python(value)
1024
1025    @property
1026    def target_field(self):
1027        return self.foreign_related_fields[0]
1028
1029    def validate(self, value, model_instance):
1030        if self.remote_field.parent_link:
1031            return
1032        super().validate(value, model_instance)
1033        if value is None:
1034            return
1035
1036        using = router.db_for_read(self.remote_field.model, instance=model_instance)
1037        qs = self.remote_field.model._base_manager.using(using).filter(
1038            **{self.remote_field.field_name: value}
1039        )
1040        qs = qs.complex_filter(self.get_limit_choices_to())
1041        if not qs.exists():
1042            raise exceptions.ValidationError(
1043                self.error_messages["invalid"],
1044                code="invalid",
1045                params={
1046                    "model": self.remote_field.model._meta.model_name,
1047                    "pk": value,
1048                    "field": self.remote_field.field_name,
1049                    "value": value,
1050                },  # 'pk' is included for backwards compatibility
1051            )
1052
1053    def resolve_related_fields(self):
1054        related_fields = super().resolve_related_fields()
1055        for from_field, to_field in related_fields:
1056            if (
1057                to_field
1058                and to_field.model != self.remote_field.model._meta.concrete_model
1059            ):
1060                raise exceptions.FieldError(
1061                    "'{}.{}' refers to field '{}' which is not local to model "
1062                    "'{}'.".format(
1063                        self.model._meta.label,
1064                        self.name,
1065                        to_field.name,
1066                        self.remote_field.model._meta.concrete_model._meta.label,
1067                    )
1068                )
1069        return related_fields
1070
1071    def get_attname(self):
1072        return "%s_id" % self.name
1073
1074    def get_attname_column(self):
1075        attname = self.get_attname()
1076        column = self.db_column or attname
1077        return attname, column
1078
1079    def get_default(self):
1080        """Return the to_field if the default value is an object."""
1081        field_default = super().get_default()
1082        if isinstance(field_default, self.remote_field.model):
1083            return getattr(field_default, self.target_field.attname)
1084        return field_default
1085
1086    def get_db_prep_save(self, value, connection):
1087        if value is None or (
1088            value == ""
1089            and (
1090                not self.target_field.empty_strings_allowed
1091                or connection.features.interprets_empty_strings_as_nulls
1092            )
1093        ):
1094            return None
1095        else:
1096            return self.target_field.get_db_prep_save(value, connection=connection)
1097
1098    def get_db_prep_value(self, value, connection, prepared=False):
1099        return self.target_field.get_db_prep_value(value, connection, prepared)
1100
1101    def get_prep_value(self, value):
1102        return self.target_field.get_prep_value(value)
1103
1104    def contribute_to_related_class(self, cls, related):
1105        super().contribute_to_related_class(cls, related)
1106        if self.remote_field.field_name is None:
1107            self.remote_field.field_name = cls._meta.pk.name
1108
1109    def db_check(self, connection):
1110        return None
1111
1112    def db_type(self, connection):
1113        return self.target_field.rel_db_type(connection=connection)
1114
1115    def cast_db_type(self, connection):
1116        return self.target_field.cast_db_type(connection=connection)
1117
1118    def db_parameters(self, connection):
1119        target_db_parameters = self.target_field.db_parameters(connection)
1120        return {
1121            "type": self.db_type(connection),
1122            "check": self.db_check(connection),
1123            "collation": target_db_parameters.get("collation"),
1124        }
1125
1126    def convert_empty_strings(self, value, expression, connection):
1127        if (not value) and isinstance(value, str):
1128            return None
1129        return value
1130
1131    def get_db_converters(self, connection):
1132        converters = super().get_db_converters(connection)
1133        if connection.features.interprets_empty_strings_as_nulls:
1134            converters += [self.convert_empty_strings]
1135        return converters
1136
1137    def get_col(self, alias, output_field=None):
1138        if output_field is None:
1139            output_field = self.target_field
1140            while isinstance(output_field, ForeignKey):
1141                output_field = output_field.target_field
1142                if output_field is self:
1143                    raise ValueError("Cannot resolve output_field.")
1144        return super().get_col(alias, output_field)
1145
1146
1147class OneToOneField(ForeignKey):
1148    """
1149    A OneToOneField is essentially the same as a ForeignKey, with the exception
1150    that it always carries a "unique" constraint with it and the reverse
1151    relation always returns the object pointed to (since there will only ever
1152    be one), rather than returning a list.
1153    """
1154
1155    # Field flags
1156    many_to_many = False
1157    many_to_one = False
1158    one_to_many = False
1159    one_to_one = True
1160
1161    related_accessor_class = ReverseOneToOneDescriptor
1162    forward_related_accessor_class = ForwardOneToOneDescriptor
1163    rel_class = OneToOneRel
1164
1165    description = "One-to-one relationship"
1166
1167    def __init__(self, to, on_delete, to_field=None, **kwargs):
1168        kwargs["unique"] = True
1169        super().__init__(to, on_delete, to_field=to_field, **kwargs)
1170
1171    def deconstruct(self):
1172        name, path, args, kwargs = super().deconstruct()
1173        if "unique" in kwargs:
1174            del kwargs["unique"]
1175        return name, path, args, kwargs
1176
1177    def save_form_data(self, instance, data):
1178        if isinstance(data, self.remote_field.model):
1179            setattr(instance, self.name, data)
1180        else:
1181            setattr(instance, self.attname, data)
1182            # Remote field object must be cleared otherwise Model.save()
1183            # will reassign attname using the related object pk.
1184            if data is None:
1185                setattr(instance, self.name, data)
1186
1187    def _check_unique(self, **kwargs):
1188        # Override ForeignKey since check isn't applicable here.
1189        return []
1190
1191
1192def create_many_to_many_intermediary_model(field, klass):
1193    from plain import models
1194
1195    def set_managed(model, related, through):
1196        through._meta.managed = model._meta.managed or related._meta.managed
1197
1198    to_model = resolve_relation(klass, field.remote_field.model)
1199    name = f"{klass._meta.object_name}_{field.name}"
1200    lazy_related_operation(set_managed, klass, to_model, name)
1201
1202    to = make_model_tuple(to_model)[1]
1203    from_ = klass._meta.model_name
1204    if to == from_:
1205        to = "to_%s" % to
1206        from_ = "from_%s" % from_
1207
1208    meta = type(
1209        "Meta",
1210        (),
1211        {
1212            "db_table": field._get_m2m_db_table(klass._meta),
1213            "auto_created": klass,
1214            "package_label": klass._meta.package_label,
1215            "db_tablespace": klass._meta.db_tablespace,
1216            "constraints": [
1217                models.UniqueConstraint(
1218                    fields=[from_, to],
1219                    name=f"{klass._meta.package_label}_{name.lower()}_unique",
1220                )
1221            ],
1222            "packages": field.model._meta.packages,
1223        },
1224    )
1225    # Construct and return the new class.
1226    return type(
1227        name,
1228        (models.Model,),
1229        {
1230            "Meta": meta,
1231            "__module__": klass.__module__,
1232            from_: models.ForeignKey(
1233                klass,
1234                related_name="%s+" % name,
1235                db_tablespace=field.db_tablespace,
1236                db_constraint=field.remote_field.db_constraint,
1237                on_delete=CASCADE,
1238            ),
1239            to: models.ForeignKey(
1240                to_model,
1241                related_name="%s+" % name,
1242                db_tablespace=field.db_tablespace,
1243                db_constraint=field.remote_field.db_constraint,
1244                on_delete=CASCADE,
1245            ),
1246        },
1247    )
1248
1249
1250class ManyToManyField(RelatedField):
1251    """
1252    Provide a many-to-many relation by using an intermediary model that
1253    holds two ForeignKey fields pointed at the two sides of the relation.
1254
1255    Unless a ``through`` model was provided, ManyToManyField will use the
1256    create_many_to_many_intermediary_model factory to automatically generate
1257    the intermediary model.
1258    """
1259
1260    # Field flags
1261    many_to_many = True
1262    many_to_one = False
1263    one_to_many = False
1264    one_to_one = False
1265
1266    rel_class = ManyToManyRel
1267
1268    description = "Many-to-many relationship"
1269
1270    def __init__(
1271        self,
1272        to,
1273        related_name=None,
1274        related_query_name=None,
1275        limit_choices_to=None,
1276        symmetrical=None,
1277        through=None,
1278        through_fields=None,
1279        db_constraint=True,
1280        db_table=None,
1281        swappable=True,
1282        **kwargs,
1283    ):
1284        try:
1285            to._meta
1286        except AttributeError:
1287            if not isinstance(to, str):
1288                raise TypeError(
1289                    "{}({!r}) is invalid. First parameter to ManyToManyField "
1290                    "must be either a model, a model name, or the string {!r}".format(
1291                        self.__class__.__name__,
1292                        to,
1293                        RECURSIVE_RELATIONSHIP_CONSTANT,
1294                    )
1295                )
1296
1297        if symmetrical is None:
1298            symmetrical = to == RECURSIVE_RELATIONSHIP_CONSTANT
1299
1300        if through is not None and db_table is not None:
1301            raise ValueError(
1302                "Cannot specify a db_table if an intermediary model is used."
1303            )
1304
1305        kwargs["rel"] = self.rel_class(
1306            self,
1307            to,
1308            related_name=related_name,
1309            related_query_name=related_query_name,
1310            limit_choices_to=limit_choices_to,
1311            symmetrical=symmetrical,
1312            through=through,
1313            through_fields=through_fields,
1314            db_constraint=db_constraint,
1315        )
1316        self.has_null_arg = "null" in kwargs
1317
1318        super().__init__(
1319            related_name=related_name,
1320            related_query_name=related_query_name,
1321            limit_choices_to=limit_choices_to,
1322            **kwargs,
1323        )
1324
1325        self.db_table = db_table
1326        self.swappable = swappable
1327
1328    def check(self, **kwargs):
1329        return [
1330            *super().check(**kwargs),
1331            *self._check_unique(**kwargs),
1332            *self._check_relationship_model(**kwargs),
1333            *self._check_ignored_options(**kwargs),
1334            *self._check_table_uniqueness(**kwargs),
1335        ]
1336
1337    def _check_unique(self, **kwargs):
1338        if self.unique:
1339            return [
1340                preflight.Error(
1341                    "ManyToManyFields cannot be unique.",
1342                    obj=self,
1343                    id="fields.E330",
1344                )
1345            ]
1346        return []
1347
1348    def _check_ignored_options(self, **kwargs):
1349        warnings = []
1350
1351        if self.has_null_arg:
1352            warnings.append(
1353                preflight.Warning(
1354                    "null has no effect on ManyToManyField.",
1355                    obj=self,
1356                    id="fields.W340",
1357                )
1358            )
1359
1360        if self._validators:
1361            warnings.append(
1362                preflight.Warning(
1363                    "ManyToManyField does not support validators.",
1364                    obj=self,
1365                    id="fields.W341",
1366                )
1367            )
1368        if self.remote_field.symmetrical and self._related_name:
1369            warnings.append(
1370                preflight.Warning(
1371                    "related_name has no effect on ManyToManyField "
1372                    'with a symmetrical relationship, e.g. to "self".',
1373                    obj=self,
1374                    id="fields.W345",
1375                )
1376            )
1377        if self.db_comment:
1378            warnings.append(
1379                preflight.Warning(
1380                    "db_comment has no effect on ManyToManyField.",
1381                    obj=self,
1382                    id="fields.W346",
1383                )
1384            )
1385
1386        return warnings
1387
1388    def _check_relationship_model(self, from_model=None, **kwargs):
1389        if hasattr(self.remote_field.through, "_meta"):
1390            qualified_model_name = "{}.{}".format(
1391                self.remote_field.through._meta.package_label,
1392                self.remote_field.through.__name__,
1393            )
1394        else:
1395            qualified_model_name = self.remote_field.through
1396
1397        errors = []
1398
1399        if self.remote_field.through not in self.opts.packages.get_models(
1400            include_auto_created=True
1401        ):
1402            # The relationship model is not installed.
1403            errors.append(
1404                preflight.Error(
1405                    "Field specifies a many-to-many relation through model "
1406                    "'%s', which has not been installed." % qualified_model_name,
1407                    obj=self,
1408                    id="fields.E331",
1409                )
1410            )
1411
1412        else:
1413            assert from_model is not None, (
1414                "ManyToManyField with intermediate "
1415                "tables cannot be checked if you don't pass the model "
1416                "where the field is attached to."
1417            )
1418            # Set some useful local variables
1419            to_model = resolve_relation(from_model, self.remote_field.model)
1420            from_model_name = from_model._meta.object_name
1421            if isinstance(to_model, str):
1422                to_model_name = to_model
1423            else:
1424                to_model_name = to_model._meta.object_name
1425            relationship_model_name = self.remote_field.through._meta.object_name
1426            self_referential = from_model == to_model
1427            # Count foreign keys in intermediate model
1428            if self_referential:
1429                seen_self = sum(
1430                    from_model == getattr(field.remote_field, "model", None)
1431                    for field in self.remote_field.through._meta.fields
1432                )
1433
1434                if seen_self > 2 and not self.remote_field.through_fields:
1435                    errors.append(
1436                        preflight.Error(
1437                            "The model is used as an intermediate model by "
1438                            "'{}', but it has more than two foreign keys "
1439                            "to '{}', which is ambiguous. You must specify "
1440                            "which two foreign keys Plain should use via the "
1441                            "through_fields keyword argument.".format(
1442                                self, from_model_name
1443                            ),
1444                            hint=(
1445                                "Use through_fields to specify which two foreign keys "
1446                                "Plain should use."
1447                            ),
1448                            obj=self.remote_field.through,
1449                            id="fields.E333",
1450                        )
1451                    )
1452
1453            else:
1454                # Count foreign keys in relationship model
1455                seen_from = sum(
1456                    from_model == getattr(field.remote_field, "model", None)
1457                    for field in self.remote_field.through._meta.fields
1458                )
1459                seen_to = sum(
1460                    to_model == getattr(field.remote_field, "model", None)
1461                    for field in self.remote_field.through._meta.fields
1462                )
1463
1464                if seen_from > 1 and not self.remote_field.through_fields:
1465                    errors.append(
1466                        preflight.Error(
1467                            (
1468                                "The model is used as an intermediate model by "
1469                                "'{}', but it has more than one foreign key "
1470                                "from '{}', which is ambiguous. You must specify "
1471                                "which foreign key Plain should use via the "
1472                                "through_fields keyword argument."
1473                            ).format(self, from_model_name),
1474                            hint=(
1475                                "If you want to create a recursive relationship, "
1476                                'use ManyToManyField("{}", through="{}").'
1477                            ).format(
1478                                RECURSIVE_RELATIONSHIP_CONSTANT,
1479                                relationship_model_name,
1480                            ),
1481                            obj=self,
1482                            id="fields.E334",
1483                        )
1484                    )
1485
1486                if seen_to > 1 and not self.remote_field.through_fields:
1487                    errors.append(
1488                        preflight.Error(
1489                            "The model is used as an intermediate model by "
1490                            "'{}', but it has more than one foreign key "
1491                            "to '{}', which is ambiguous. You must specify "
1492                            "which foreign key Plain should use via the "
1493                            "through_fields keyword argument.".format(
1494                                self, to_model_name
1495                            ),
1496                            hint=(
1497                                "If you want to create a recursive relationship, "
1498                                'use ManyToManyField("{}", through="{}").'
1499                            ).format(
1500                                RECURSIVE_RELATIONSHIP_CONSTANT,
1501                                relationship_model_name,
1502                            ),
1503                            obj=self,
1504                            id="fields.E335",
1505                        )
1506                    )
1507
1508                if seen_from == 0 or seen_to == 0:
1509                    errors.append(
1510                        preflight.Error(
1511                            "The model is used as an intermediate model by "
1512                            "'{}', but it does not have a foreign key to '{}' or '{}'.".format(
1513                                self, from_model_name, to_model_name
1514                            ),
1515                            obj=self.remote_field.through,
1516                            id="fields.E336",
1517                        )
1518                    )
1519
1520        # Validate `through_fields`.
1521        if self.remote_field.through_fields is not None:
1522            # Validate that we're given an iterable of at least two items
1523            # and that none of them is "falsy".
1524            if not (
1525                len(self.remote_field.through_fields) >= 2
1526                and self.remote_field.through_fields[0]
1527                and self.remote_field.through_fields[1]
1528            ):
1529                errors.append(
1530                    preflight.Error(
1531                        "Field specifies 'through_fields' but does not provide "
1532                        "the names of the two link fields that should be used "
1533                        "for the relation through model '%s'." % qualified_model_name,
1534                        hint=(
1535                            "Make sure you specify 'through_fields' as "
1536                            "through_fields=('field1', 'field2')"
1537                        ),
1538                        obj=self,
1539                        id="fields.E337",
1540                    )
1541                )
1542
1543            # Validate the given through fields -- they should be actual
1544            # fields on the through model, and also be foreign keys to the
1545            # expected models.
1546            else:
1547                assert from_model is not None, (
1548                    "ManyToManyField with intermediate "
1549                    "tables cannot be checked if you don't pass the model "
1550                    "where the field is attached to."
1551                )
1552
1553                source, through, target = (
1554                    from_model,
1555                    self.remote_field.through,
1556                    self.remote_field.model,
1557                )
1558                source_field_name, target_field_name = self.remote_field.through_fields[
1559                    :2
1560                ]
1561
1562                for field_name, related_model in (
1563                    (source_field_name, source),
1564                    (target_field_name, target),
1565                ):
1566                    possible_field_names = []
1567                    for f in through._meta.fields:
1568                        if (
1569                            hasattr(f, "remote_field")
1570                            and getattr(f.remote_field, "model", None) == related_model
1571                        ):
1572                            possible_field_names.append(f.name)
1573                    if possible_field_names:
1574                        hint = (
1575                            "Did you mean one of the following foreign keys to '{}': "
1576                            "{}?".format(
1577                                related_model._meta.object_name,
1578                                ", ".join(possible_field_names),
1579                            )
1580                        )
1581                    else:
1582                        hint = None
1583
1584                    try:
1585                        field = through._meta.get_field(field_name)
1586                    except exceptions.FieldDoesNotExist:
1587                        errors.append(
1588                            preflight.Error(
1589                                "The intermediary model '{}' has no field '{}'.".format(
1590                                    qualified_model_name, field_name
1591                                ),
1592                                hint=hint,
1593                                obj=self,
1594                                id="fields.E338",
1595                            )
1596                        )
1597                    else:
1598                        if not (
1599                            hasattr(field, "remote_field")
1600                            and getattr(field.remote_field, "model", None)
1601                            == related_model
1602                        ):
1603                            errors.append(
1604                                preflight.Error(
1605                                    "'{}.{}' is not a foreign key to '{}'.".format(
1606                                        through._meta.object_name,
1607                                        field_name,
1608                                        related_model._meta.object_name,
1609                                    ),
1610                                    hint=hint,
1611                                    obj=self,
1612                                    id="fields.E339",
1613                                )
1614                            )
1615
1616        return errors
1617
1618    def _check_table_uniqueness(self, **kwargs):
1619        if (
1620            isinstance(self.remote_field.through, str)
1621            or not self.remote_field.through._meta.managed
1622        ):
1623            return []
1624        registered_tables = {
1625            model._meta.db_table: model
1626            for model in self.opts.packages.get_models(include_auto_created=True)
1627            if model != self.remote_field.through and model._meta.managed
1628        }
1629        m2m_db_table = self.m2m_db_table()
1630        model = registered_tables.get(m2m_db_table)
1631        # The second condition allows multiple m2m relations on a model if
1632        # some point to a through model that proxies another through model.
1633        if (
1634            model
1635            and model._meta.concrete_model
1636            != self.remote_field.through._meta.concrete_model
1637        ):
1638            if model._meta.auto_created:
1639
1640                def _get_field_name(model):
1641                    for field in model._meta.auto_created._meta.many_to_many:
1642                        if field.remote_field.through is model:
1643                            return field.name
1644
1645                opts = model._meta.auto_created._meta
1646                clashing_obj = f"{opts.label}.{_get_field_name(model)}"
1647            else:
1648                clashing_obj = model._meta.label
1649            if settings.DATABASE_ROUTERS:
1650                error_class, error_id = preflight.Warning, "fields.W344"
1651                error_hint = (
1652                    "You have configured settings.DATABASE_ROUTERS. Verify "
1653                    "that the table of %r is correctly routed to a separate "
1654                    "database." % clashing_obj
1655                )
1656            else:
1657                error_class, error_id = preflight.Error, "fields.E340"
1658                error_hint = None
1659            return [
1660                error_class(
1661                    f"The field's intermediary table '{m2m_db_table}' clashes with the "
1662                    f"table name of '{clashing_obj}'.",
1663                    obj=self,
1664                    hint=error_hint,
1665                    id=error_id,
1666                )
1667            ]
1668        return []
1669
1670    def deconstruct(self):
1671        name, path, args, kwargs = super().deconstruct()
1672        # Handle the simpler arguments.
1673        if self.db_table is not None:
1674            kwargs["db_table"] = self.db_table
1675        if self.remote_field.db_constraint is not True:
1676            kwargs["db_constraint"] = self.remote_field.db_constraint
1677        # Lowercase model names as they should be treated as case-insensitive.
1678        if isinstance(self.remote_field.model, str):
1679            if "." in self.remote_field.model:
1680                package_label, model_name = self.remote_field.model.split(".")
1681                kwargs["to"] = f"{package_label}.{model_name.lower()}"
1682            else:
1683                kwargs["to"] = self.remote_field.model.lower()
1684        else:
1685            kwargs["to"] = self.remote_field.model._meta.label_lower
1686        if getattr(self.remote_field, "through", None) is not None:
1687            if isinstance(self.remote_field.through, str):
1688                kwargs["through"] = self.remote_field.through
1689            elif not self.remote_field.through._meta.auto_created:
1690                kwargs["through"] = self.remote_field.through._meta.label
1691        # If swappable is True, then see if we're actually pointing to the target
1692        # of a swap.
1693        swappable_setting = self.swappable_setting
1694        if swappable_setting is not None:
1695            # If it's already a settings reference, error.
1696            if hasattr(kwargs["to"], "setting_name"):
1697                if kwargs["to"].setting_name != swappable_setting:
1698                    raise ValueError(
1699                        "Cannot deconstruct a ManyToManyField pointing to a "
1700                        "model that is swapped in place of more than one model "
1701                        "({} and {})".format(
1702                            kwargs["to"].setting_name, swappable_setting
1703                        )
1704                    )
1705
1706            kwargs["to"] = SettingsReference(
1707                kwargs["to"],
1708                swappable_setting,
1709            )
1710        return name, path, args, kwargs
1711
1712    def _get_path_info(self, direct=False, filtered_relation=None):
1713        """Called by both direct and indirect m2m traversal."""
1714        int_model = self.remote_field.through
1715        linkfield1 = int_model._meta.get_field(self.m2m_field_name())
1716        linkfield2 = int_model._meta.get_field(self.m2m_reverse_field_name())
1717        if direct:
1718            join1infos = linkfield1.reverse_path_infos
1719            if filtered_relation:
1720                join2infos = linkfield2.get_path_info(filtered_relation)
1721            else:
1722                join2infos = linkfield2.path_infos
1723        else:
1724            join1infos = linkfield2.reverse_path_infos
1725            if filtered_relation:
1726                join2infos = linkfield1.get_path_info(filtered_relation)
1727            else:
1728                join2infos = linkfield1.path_infos
1729        # Get join infos between the last model of join 1 and the first model
1730        # of join 2. Assume the only reason these may differ is due to model
1731        # inheritance.
1732        join1_final = join1infos[-1].to_opts
1733        join2_initial = join2infos[0].from_opts
1734        if join1_final is join2_initial:
1735            intermediate_infos = []
1736        elif issubclass(join1_final.model, join2_initial.model):
1737            intermediate_infos = join1_final.get_path_to_parent(join2_initial.model)
1738        else:
1739            intermediate_infos = join2_initial.get_path_from_parent(join1_final.model)
1740
1741        return [*join1infos, *intermediate_infos, *join2infos]
1742
1743    def get_path_info(self, filtered_relation=None):
1744        return self._get_path_info(direct=True, filtered_relation=filtered_relation)
1745
1746    @cached_property
1747    def path_infos(self):
1748        return self.get_path_info()
1749
1750    def get_reverse_path_info(self, filtered_relation=None):
1751        return self._get_path_info(direct=False, filtered_relation=filtered_relation)
1752
1753    @cached_property
1754    def reverse_path_infos(self):
1755        return self.get_reverse_path_info()
1756
1757    def _get_m2m_db_table(self, opts):
1758        """
1759        Function that can be curried to provide the m2m table name for this
1760        relation.
1761        """
1762        if self.remote_field.through is not None:
1763            return self.remote_field.through._meta.db_table
1764        elif self.db_table:
1765            return self.db_table
1766        else:
1767            m2m_table_name = f"{utils.strip_quotes(opts.db_table)}_{self.name}"
1768            return utils.truncate_name(m2m_table_name, connection.ops.max_name_length())
1769
1770    def _get_m2m_attr(self, related, attr):
1771        """
1772        Function that can be curried to provide the source accessor or DB
1773        column name for the m2m table.
1774        """
1775        cache_attr = "_m2m_%s_cache" % attr
1776        if hasattr(self, cache_attr):
1777            return getattr(self, cache_attr)
1778        if self.remote_field.through_fields is not None:
1779            link_field_name = self.remote_field.through_fields[0]
1780        else:
1781            link_field_name = None
1782        for f in self.remote_field.through._meta.fields:
1783            if (
1784                f.is_relation
1785                and f.remote_field.model == related.related_model
1786                and (link_field_name is None or link_field_name == f.name)
1787            ):
1788                setattr(self, cache_attr, getattr(f, attr))
1789                return getattr(self, cache_attr)
1790
1791    def _get_m2m_reverse_attr(self, related, attr):
1792        """
1793        Function that can be curried to provide the related accessor or DB
1794        column name for the m2m table.
1795        """
1796        cache_attr = "_m2m_reverse_%s_cache" % attr
1797        if hasattr(self, cache_attr):
1798            return getattr(self, cache_attr)
1799        found = False
1800        if self.remote_field.through_fields is not None:
1801            link_field_name = self.remote_field.through_fields[1]
1802        else:
1803            link_field_name = None
1804        for f in self.remote_field.through._meta.fields:
1805            if f.is_relation and f.remote_field.model == related.model:
1806                if link_field_name is None and related.related_model == related.model:
1807                    # If this is an m2m-intermediate to self,
1808                    # the first foreign key you find will be
1809                    # the source column. Keep searching for
1810                    # the second foreign key.
1811                    if found:
1812                        setattr(self, cache_attr, getattr(f, attr))
1813                        break
1814                    else:
1815                        found = True
1816                elif link_field_name is None or link_field_name == f.name:
1817                    setattr(self, cache_attr, getattr(f, attr))
1818                    break
1819        return getattr(self, cache_attr)
1820
1821    def contribute_to_class(self, cls, name, **kwargs):
1822        # To support multiple relations to self, it's useful to have a non-None
1823        # related name on symmetrical relations for internal reasons. The
1824        # concept doesn't make a lot of sense externally ("you want me to
1825        # specify *what* on my non-reversible relation?!"), so we set it up
1826        # automatically. The funky name reduces the chance of an accidental
1827        # clash.
1828        if self.remote_field.symmetrical and (
1829            self.remote_field.model == RECURSIVE_RELATIONSHIP_CONSTANT
1830            or self.remote_field.model == cls._meta.object_name
1831        ):
1832            self.remote_field.related_name = "%s_rel_+" % name
1833        elif self.remote_field.is_hidden():
1834            # If the backwards relation is disabled, replace the original
1835            # related_name with one generated from the m2m field name. Django
1836            # still uses backwards relations internally and we need to avoid
1837            # clashes between multiple m2m fields with related_name == '+'.
1838            self.remote_field.related_name = "_{}_{}_{}_+".format(
1839                cls._meta.package_label,
1840                cls.__name__.lower(),
1841                name,
1842            )
1843
1844        super().contribute_to_class(cls, name, **kwargs)
1845
1846        # The intermediate m2m model is not auto created if:
1847        #  1) There is a manually specified intermediate, or
1848        #  2) The class owning the m2m field is abstract.
1849        #  3) The class owning the m2m field has been swapped out.
1850        if not cls._meta.abstract:
1851            if self.remote_field.through:
1852
1853                def resolve_through_model(_, model, field):
1854                    field.remote_field.through = model
1855
1856                lazy_related_operation(
1857                    resolve_through_model, cls, self.remote_field.through, field=self
1858                )
1859            elif not cls._meta.swapped:
1860                self.remote_field.through = create_many_to_many_intermediary_model(
1861                    self, cls
1862                )
1863
1864        # Add the descriptor for the m2m relation.
1865        setattr(cls, self.name, ManyToManyDescriptor(self.remote_field, reverse=False))
1866
1867        # Set up the accessor for the m2m table name for the relation.
1868        self.m2m_db_table = partial(self._get_m2m_db_table, cls._meta)
1869
1870    def contribute_to_related_class(self, cls, related):
1871        # Internal M2Ms (i.e., those with a related name ending with '+')
1872        # and swapped models don't get a related descriptor.
1873        if (
1874            not self.remote_field.is_hidden()
1875            and not related.related_model._meta.swapped
1876        ):
1877            setattr(
1878                cls,
1879                related.get_accessor_name(),
1880                ManyToManyDescriptor(self.remote_field, reverse=True),
1881            )
1882
1883        # Set up the accessors for the column names on the m2m table.
1884        self.m2m_column_name = partial(self._get_m2m_attr, related, "column")
1885        self.m2m_reverse_name = partial(self._get_m2m_reverse_attr, related, "column")
1886
1887        self.m2m_field_name = partial(self._get_m2m_attr, related, "name")
1888        self.m2m_reverse_field_name = partial(
1889            self._get_m2m_reverse_attr, related, "name"
1890        )
1891
1892        get_m2m_rel = partial(self._get_m2m_attr, related, "remote_field")
1893        self.m2m_target_field_name = lambda: get_m2m_rel().field_name
1894        get_m2m_reverse_rel = partial(
1895            self._get_m2m_reverse_attr, related, "remote_field"
1896        )
1897        self.m2m_reverse_target_field_name = lambda: get_m2m_reverse_rel().field_name
1898
1899    def set_attributes_from_rel(self):
1900        pass
1901
1902    def value_from_object(self, obj):
1903        return [] if obj.pk is None else list(getattr(obj, self.attname).all())
1904
1905    def save_form_data(self, instance, data):
1906        getattr(instance, self.attname).set(data)
1907
1908    def db_check(self, connection):
1909        return None
1910
1911    def db_type(self, connection):
1912        # A ManyToManyField is not represented by a single column,
1913        # so return None.
1914        return None
1915
1916    def db_parameters(self, connection):
1917        return {"type": None, "check": None}