1from __future__ import annotations
  2
  3import copy
  4import inspect
  5from collections import defaultdict
  6from collections.abc import Iterable
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
  9
 10from plain.models.exceptions import FieldDoesNotExist
 11from plain.models.query import QuerySet
 12from plain.models.registry import models_registry as default_models_registry
 13from plain.utils.datastructures import ImmutableList
 14
 15if TYPE_CHECKING:
 16    from plain.models.base import Model
 17    from plain.models.fields import Field
 18    from plain.models.fields.related import ManyToManyField
 19    from plain.models.fields.reverse_related import ForeignObjectRel
 20
 21EMPTY_RELATION_TREE = ()
 22
 23IMMUTABLE_WARNING = (
 24    "The return type of '%s' should never be mutated. If you want to manipulate this "
 25    "list for your own use, make a copy first."
 26)
 27
 28T = TypeVar("T")
 29
 30
 31def make_immutable_fields_list(name: str, data: Iterable[T]) -> ImmutableList[T]:
 32    return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
 33
 34
 35class Meta:
 36    """
 37    Model metadata descriptor and container.
 38
 39    Acts as both a descriptor (for lazy initialization and access control)
 40    and the actual metadata instance (cached per model class).
 41    """
 42
 43    FORWARD_PROPERTIES = {
 44        "fields",
 45        "many_to_many",
 46        "concrete_fields",
 47        "local_concrete_fields",
 48        "_non_pk_concrete_field_names",
 49        "_forward_fields_map",
 50        "base_queryset",
 51    }
 52    REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
 53
 54    # Type annotations for attributes set in _create_and_cache
 55    # These exist on cached instances, not on the descriptor itself
 56    model: type[Model]
 57    models_registry: Any
 58    _get_fields_cache: dict[Any, Any]
 59    local_fields: list[Field]
 60    local_many_to_many: list[ManyToManyField]
 61
 62    def __init__(self, models_registry: Any | None = None):
 63        """
 64        Initialize the descriptor with optional configuration.
 65
 66        This is called ONCE when defining the base Model class.
 67        The descriptor then creates cached instances per model subclass.
 68        """
 69        self._models_registry = models_registry
 70        self._cache: dict[type[Model], Meta] = {}
 71
 72    def __get__(self, instance: Any, owner: type[Model]) -> Meta:
 73        """
 74        Descriptor protocol - returns cached Meta instance for the model class.
 75
 76        This is called when accessing Model._model_meta and returns a per-class
 77        cached instance created by _create_and_cache().
 78
 79        Can be accessed from both class and instances:
 80        - MyModel._model_meta (class access)
 81        - my_instance._model_meta (instance access - returns class's metadata)
 82        """
 83        # Allow instance access - just return the class's metadata
 84        if instance is not None:
 85            owner = instance.__class__
 86
 87        # Skip for the base Model class - return descriptor
 88        if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
 89            return self
 90
 91        # Return cached instance or create new one
 92        if owner not in self._cache:
 93            # Create the instance and cache it BEFORE field contribution
 94            # to avoid infinite recursion when fields access cls._model_meta
 95            return self._create_and_cache(owner)
 96
 97        return self._cache[owner]
 98
 99    def _create_and_cache(self, model: type[Model]) -> Meta:
100        """Create Meta instance and cache it before field contribution."""
101        # Create instance without calling __init__
102        instance = Meta.__new__(Meta)
103
104        # Initialize basic model-specific state
105        instance.model = model
106        instance.models_registry = self._models_registry or default_models_registry
107        instance._get_fields_cache = {}
108        instance.local_fields = []
109        instance.local_many_to_many = []
110
111        # Cache the instance BEFORE processing fields to prevent recursion
112        self._cache[model] = instance
113
114        # Now process fields - they can safely access cls._model_meta
115        seen_attrs = set()
116        for klass in model.__mro__:
117            for attr_name in list(klass.__dict__.keys()):
118                if attr_name.startswith("_") or attr_name in seen_attrs:
119                    continue
120                seen_attrs.add(attr_name)
121
122                attr_value = klass.__dict__[attr_name]
123
124                if not inspect.isclass(attr_value) and hasattr(
125                    attr_value, "contribute_to_class"
126                ):
127                    if attr_name not in model.__dict__:
128                        field = copy.deepcopy(attr_value)
129                    else:
130                        field = attr_value
131                    field.contribute_to_class(model, attr_name)
132
133        # Sort fields: primary key first, then alphabetically by name
134        instance.local_fields.sort(key=lambda f: (not f.primary_key, f.name))
135        instance.local_many_to_many.sort(key=lambda f: f.name)
136
137        # Set index names now that fields are contributed
138        # Trigger model_options descriptor to ensure it's initialized
139        # (accessing it will cache the instance)
140        for index in model.model_options.indexes:
141            if not index.name:
142                index.set_name_with_model(model)
143
144        return instance
145
146    @property
147    def base_queryset(self) -> QuerySet:
148        """
149        The base queryset is used by Plain's internal operations like cascading
150        deletes, migrations, and related object lookups. It provides access to
151        all objects in the database without any filtering, ensuring Plain can
152        always see the complete dataset when performing framework operations.
153
154        Unlike user-defined querysets which may filter results (e.g. only active
155        objects), the base queryset must never filter out rows to prevent
156        incomplete results in related queries.
157        """
158        return QuerySet.from_model(self.model)
159
160    def add_field(self, field: Field) -> None:
161        from plain.models.fields.related import ManyToManyField, RelatedField
162
163        if isinstance(field, ManyToManyField):
164            self.local_many_to_many.append(field)
165        else:
166            self.local_fields.append(field)
167
168        # If the field being added is a relation to another known field,
169        # expire the cache on this field and the forward cache on the field
170        # being referenced, because there will be new relationships in the
171        # cache. Otherwise, expire the cache of references *to* this field.
172        # The mechanism for getting at the related model is slightly odd -
173        # ideally, we'd just ask for field.related_model. However, related_model
174        # is a cached property, and all the models haven't been loaded yet, so
175        # we need to make sure we don't cache a string reference.
176        if isinstance(field, RelatedField) and field.remote_field.model:
177            try:
178                field.remote_field.model._model_meta._expire_cache(forward=False)
179            except AttributeError:
180                pass
181            self._expire_cache()
182        else:
183            self._expire_cache(reverse=False)
184
185    @cached_property
186    def fields(self) -> ImmutableList[Field]:
187        from plain.models.fields.related import RelatedField
188
189        """
190        Return a list of all forward fields on the model and its parents,
191        excluding ManyToManyFields.
192
193        Private API intended only to be used by Plain itself; get_fields()
194        combined with filtering of field properties is the public API for
195        obtaining this field list.
196        """
197
198        # For legacy reasons, the fields property should only contain forward
199        # fields that are not private or with a m2m cardinality.
200        def is_not_an_m2m_field(f: Any) -> bool:
201            from plain.models.fields.related import ManyToManyField
202
203            return not isinstance(f, ManyToManyField)
204
205        def is_not_a_generic_relation(f: Any) -> bool:
206            from plain.models.fields.related import ForeignKeyField, ManyToManyField
207
208            # Only ForeignKeyField and ManyToManyField are valid RelatedFields
209            # Anything else is a generic relation
210            if not isinstance(f, RelatedField):
211                return True
212            return isinstance(f, ForeignKeyField | ManyToManyField)
213
214        return make_immutable_fields_list(
215            "fields",
216            (
217                f
218                for f in self._get_fields(reverse=False)
219                if is_not_an_m2m_field(f) and is_not_a_generic_relation(f)
220            ),
221        )
222
223    @cached_property
224    def concrete_fields(self) -> ImmutableList[Field]:
225        """
226        Return a list of all concrete fields on the model and its parents.
227
228        Private API intended only to be used by Plain itself; get_fields()
229        combined with filtering of field properties is the public API for
230        obtaining this field list.
231        """
232        return make_immutable_fields_list(
233            "concrete_fields", (f for f in self.fields if f.concrete)
234        )
235
236    @cached_property
237    def local_concrete_fields(self) -> ImmutableList[Field]:
238        """
239        Return a list of all concrete fields on the model.
240
241        Private API intended only to be used by Plain itself; get_fields()
242        combined with filtering of field properties is the public API for
243        obtaining this field list.
244        """
245        return make_immutable_fields_list(
246            "local_concrete_fields", (f for f in self.local_fields if f.concrete)
247        )
248
249    @cached_property
250    def many_to_many(self) -> ImmutableList[Field]:
251        """
252        Return a list of all many to many fields on the model and its parents.
253
254        Private API intended only to be used by Plain itself; get_fields()
255        combined with filtering of field properties is the public API for
256        obtaining this list.
257        """
258        from plain.models.fields.related import ManyToManyField
259
260        return make_immutable_fields_list(
261            "many_to_many",
262            (
263                f
264                for f in self._get_fields(reverse=False)
265                if isinstance(f, ManyToManyField)
266            ),
267        )
268
269    @cached_property
270    def related_objects(self) -> ImmutableList[ForeignObjectRel]:
271        """
272        Return all related objects pointing to the current model. The related
273        objects can come from a one-to-one, one-to-many, or many-to-many field
274        relation type.
275
276        Private API intended only to be used by Plain itself; get_fields()
277        combined with filtering of field properties is the public API for
278        obtaining this field list.
279        """
280        from plain.models.fields.reverse_related import ForeignKeyRel, ManyToManyRel
281
282        all_related_fields = self._get_fields(forward=False, reverse=True)
283        return make_immutable_fields_list(
284            "related_objects",
285            (
286                obj
287                for obj in all_related_fields
288                if isinstance(obj, ManyToManyRel | ForeignKeyRel)
289            ),
290        )
291
292    @cached_property
293    def _forward_fields_map(self) -> dict[str, Field]:
294        res = {}
295        fields = self._get_fields(reverse=False)
296        for field in fields:
297            res[field.name] = field
298            # Due to the way Plain's internals work, get_field() should also
299            # be able to fetch a field by attname. In the case of a concrete
300            # field with relation, includes the *_id name too
301            try:
302                res[field.attname] = field
303            except AttributeError:
304                pass
305        return res
306
307    @cached_property
308    def fields_map(self) -> dict[str, Field | ForeignObjectRel]:
309        res = {}
310        fields = self._get_fields(forward=False, reverse=True)
311        for field in fields:
312            res[field.name] = field
313            # Due to the way Plain's internals work, get_field() should also
314            # be able to fetch a field by attname. In the case of a concrete
315            # field with relation, includes the *_id name too
316            try:
317                res[field.attname] = field
318            except AttributeError:
319                pass
320        return res
321
322    def get_field(self, field_name: str) -> Field | ForeignObjectRel:
323        """
324        Return a field instance given the name of a forward or reverse field.
325        """
326        try:
327            # In order to avoid premature loading of the relation tree
328            # (expensive) we prefer checking if the field is a forward field.
329            return self._forward_fields_map[field_name]
330        except KeyError:
331            # If the app registry is not ready, reverse fields are
332            # unavailable, therefore we throw a FieldDoesNotExist exception.
333            if not self.models_registry.ready:
334                raise FieldDoesNotExist(
335                    f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
336                    "so if this is an auto-created related field, it won't "
337                    "be available yet."
338                )
339
340        try:
341            # Retrieve field instance by name from cached or just-computed
342            # field map.
343            return self.fields_map[field_name]
344        except KeyError:
345            raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
346
347    def get_forward_field(self, field_name: str) -> Field:
348        """
349        Return a forward field instance given the field name.
350
351        Raises FieldDoesNotExist if the field doesn't exist or is a reverse relation.
352        """
353        try:
354            return self._forward_fields_map[field_name]
355        except KeyError:
356            raise FieldDoesNotExist(
357                f"{self.model} has no forward field named '{field_name}'"
358            )
359
360    def get_reverse_relation(self, field_name: str) -> ForeignObjectRel:
361        """
362        Return a reverse relation instance given the field name.
363
364        Raises FieldDoesNotExist if the field doesn't exist or is a forward field.
365        """
366        # If the app registry is not ready, reverse fields are unavailable
367        if not self.models_registry.ready:
368            raise FieldDoesNotExist(
369                f"{self.model} has no reverse relation named '{field_name}'. The app cache isn't ready yet."
370            )
371
372        # Check if it's a forward field first
373        if field_name in self._forward_fields_map:
374            raise FieldDoesNotExist(
375                f"'{field_name}' is a forward field, not a reverse relation"
376            )
377
378        try:
379            return self.fields_map[field_name]
380        except KeyError:
381            raise FieldDoesNotExist(
382                f"{self.model} has no reverse relation named '{field_name}'"
383            )
384
385    def _populate_directed_relation_graph(self) -> list[Field]:
386        from plain.models.fields.related import RelatedField
387
388        """
389        This method is used by each model to find its reverse objects. As this
390        method is very expensive and is accessed frequently (it looks up every
391        field in a model, in every app), it is computed on first access and then
392        is set as a property on every model.
393        """
394        related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
395
396        all_models = self.models_registry.get_models()
397        for model in all_models:
398            meta = model._model_meta
399
400            fields_with_relations = (
401                f
402                for f in meta._get_fields(reverse=False)
403                if isinstance(f, RelatedField)
404            )
405            for f in fields_with_relations:
406                if not isinstance(f.remote_field.model, str):
407                    remote_label = f.remote_field.model.model_options.label
408                    related_objects_graph[remote_label].append(f)
409
410        for model in all_models:
411            # Set the relation_tree using the internal __dict__. In this way
412            # we avoid calling the cached property. In attribute lookup,
413            # __dict__ takes precedence over a data descriptor (such as
414            # @cached_property). This means that the _model_meta._relation_tree is
415            # only called if related_objects is not in __dict__.
416            related_objects = related_objects_graph[model.model_options.label]
417            model._model_meta.__dict__["_relation_tree"] = related_objects
418        # It seems it is possible that self is not in all_models, so guard
419        # against that with default for get().
420        return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
421
422    @cached_property
423    def _relation_tree(self) -> list[Field]:
424        return self._populate_directed_relation_graph()
425
426    def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
427        # This method is usually called by packages.cache_clear(), when the
428        # registry is finalized, or when a new field is added.
429        if forward:
430            for cache_key in self.FORWARD_PROPERTIES:
431                if cache_key in self.__dict__:
432                    delattr(self, cache_key)
433        if reverse:
434            for cache_key in self.REVERSE_PROPERTIES:
435                if cache_key in self.__dict__:
436                    delattr(self, cache_key)
437        self._get_fields_cache = {}
438
439    @overload
440    def get_fields(
441        self, include_reverse: Literal[False] = False
442    ) -> ImmutableList[Field]: ...
443
444    @overload
445    def get_fields(
446        self, include_reverse: Literal[True]
447    ) -> ImmutableList[Field | ForeignObjectRel]: ...
448
449    def get_fields(
450        self, include_reverse: bool = False
451    ) -> ImmutableList[Field | ForeignObjectRel]:
452        """
453        Return a list of fields associated to the model.
454
455        By default, returns only forward fields (fields explicitly defined on
456        this model). Set include_reverse=True to also include reverse relations
457        (fields from other models that point to this model).
458
459        Args:
460            include_reverse: Include reverse relation fields (fields from other
461                           models pointing to this model). Needed for framework
462                           operations like migrations and deletion cascading.
463        """
464        return self._get_fields(reverse=include_reverse)
465
466    @overload
467    def _get_fields(
468        self,
469        *,
470        forward: Literal[True] = True,
471        reverse: Literal[False],
472        seen_models: set[type[Any]] | None = None,
473    ) -> ImmutableList[Field]: ...
474
475    @overload
476    def _get_fields(
477        self,
478        *,
479        forward: Literal[False],
480        reverse: Literal[True] = True,
481        seen_models: set[type[Any]] | None = None,
482    ) -> ImmutableList[ForeignObjectRel]: ...
483
484    @overload
485    def _get_fields(
486        self,
487        *,
488        forward: bool = True,
489        reverse: bool = True,
490        seen_models: set[type[Any]] | None = None,
491    ) -> ImmutableList[Field | ForeignObjectRel]: ...
492
493    def _get_fields(
494        self,
495        *,
496        forward: bool = True,
497        reverse: bool = True,
498        seen_models: set[type[Any]] | None = None,
499    ) -> ImmutableList[Field | ForeignObjectRel]:
500        """
501        Internal helper function to return fields of the model.
502
503        Args:
504            forward: If True, fields defined on this model are returned.
505            reverse: If True, reverse relations (fields from other models
506                    pointing to this model) are returned.
507            seen_models: Track visited models to prevent duplicates in recursion.
508        """
509
510        # This helper function is used to allow recursion in ``get_fields()``
511        # implementation and to provide a fast way for Plain's internals to
512        # access specific subsets of fields.
513
514        # We must keep track of which models we have already seen. Otherwise we
515        # could include the same field multiple times from different models.
516        topmost_call = seen_models is None
517        if seen_models is None:
518            seen_models = set()
519        seen_models.add(self.model)
520
521        # Creates a cache key composed of all arguments
522        cache_key = (forward, reverse, topmost_call)
523
524        try:
525            # In order to avoid list manipulation. Always return a shallow copy
526            # of the results.
527            return self._get_fields_cache[cache_key]
528        except KeyError:
529            pass
530
531        fields = []
532
533        if reverse:
534            # Tree is computed once and cached until the app cache is expired.
535            # It is composed of a list of fields from other models pointing to
536            # the current model (reverse relations).
537            all_fields = self._relation_tree
538            for field in all_fields:
539                fields.append(field.remote_field)
540
541        if forward:
542            fields += self.local_fields
543            fields += self.local_many_to_many
544
545        # In order to avoid list manipulation. Always
546        # return a shallow copy of the results
547        fields = make_immutable_fields_list("get_fields()", fields)
548
549        # Store result into cache for later access
550        self._get_fields_cache[cache_key] = fields
551        return fields
552
553    @cached_property
554    def _property_names(self) -> frozenset[str]:
555        """Return a set of the names of the properties defined on the model."""
556        names = []
557        for name in dir(self.model):
558            attr = inspect.getattr_static(self.model, name)
559            if isinstance(attr, property):
560                names.append(name)
561        return frozenset(names)
562
563    @cached_property
564    def _non_pk_concrete_field_names(self) -> frozenset[str]:
565        """
566        Return a set of the non-primary key concrete field names defined on the model.
567        """
568        names = []
569        for field in self.concrete_fields:
570            if not field.primary_key:
571                names.append(field.name)
572                if field.name != field.attname:
573                    names.append(field.attname)
574        return frozenset(names)
575
576    @cached_property
577    def db_returning_fields(self) -> list[Field]:
578        """
579        Private API intended only to be used by Plain itself.
580        Fields to be returned after a database insert.
581        """
582        return [
583            field
584            for field in self._get_fields(forward=True, reverse=False)
585            if getattr(field, "db_returning", False)
586        ]