Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import bisect
  4import copy
  5import inspect
  6from collections import defaultdict
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any
  9
 10from plain.models.exceptions import FieldDoesNotExist
 11from plain.models.query import QuerySet
 12from plain.models.registry import models_registry as default_models_registry
 13from plain.utils.datastructures import ImmutableList
 14
 15if TYPE_CHECKING:
 16    from plain.models.base import Model
 17    from plain.models.fields import Field
 18
 19EMPTY_RELATION_TREE = ()
 20
 21IMMUTABLE_WARNING = (
 22    "The return type of '%s' should never be mutated. If you want to manipulate this "
 23    "list for your own use, make a copy first."
 24)
 25
 26
 27def make_immutable_fields_list(name: str, data: Any) -> ImmutableList:
 28    return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
 29
 30
 31class Meta:
 32    """
 33    Model metadata descriptor and container.
 34
 35    Acts as both a descriptor (for lazy initialization and access control)
 36    and the actual metadata instance (cached per model class).
 37    """
 38
 39    FORWARD_PROPERTIES = {
 40        "fields",
 41        "many_to_many",
 42        "concrete_fields",
 43        "local_concrete_fields",
 44        "_non_pk_concrete_field_names",
 45        "_forward_fields_map",
 46        "base_queryset",
 47    }
 48    REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
 49
 50    # Type annotations for attributes set in _create_and_cache
 51    # These exist on cached instances, not on the descriptor itself
 52    model: type[Model]
 53    models_registry: Any
 54    _get_fields_cache: dict[Any, Any]
 55    local_fields: list[Field]
 56    local_many_to_many: list[Field]
 57    related_fkey_lookups: list[Any]
 58
 59    def __init__(self, models_registry: Any | None = None):
 60        """
 61        Initialize the descriptor with optional configuration.
 62
 63        This is called ONCE when defining the base Model class.
 64        The descriptor then creates cached instances per model subclass.
 65        """
 66        self._models_registry = models_registry
 67        self._cache: dict[type[Model], Meta] = {}
 68
 69    def __get__(self, instance: Any, owner: type[Model]) -> Meta:
 70        """
 71        Descriptor protocol - returns cached Meta instance for the model class.
 72
 73        This is called when accessing Model._model_meta and returns a per-class
 74        cached instance created by _create_and_cache().
 75
 76        Can be accessed from both class and instances:
 77        - MyModel._model_meta (class access)
 78        - my_instance._model_meta (instance access - returns class's metadata)
 79        """
 80        # Allow instance access - just return the class's metadata
 81        if instance is not None:
 82            owner = instance.__class__
 83
 84        # Skip for the base Model class - return descriptor
 85        if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
 86            return self  # type: ignore
 87
 88        # Return cached instance or create new one
 89        if owner not in self._cache:
 90            # Create the instance and cache it BEFORE field contribution
 91            # to avoid infinite recursion when fields access cls._model_meta
 92            return self._create_and_cache(owner)
 93
 94        return self._cache[owner]
 95
 96    def _create_and_cache(self, model: type[Model]) -> Meta:
 97        """Create Meta instance and cache it before field contribution."""
 98        # Create instance without calling __init__
 99        instance = Meta.__new__(Meta)
100
101        # Initialize basic model-specific state
102        instance.model = model
103        instance.models_registry = self._models_registry or default_models_registry
104        instance._get_fields_cache = {}
105        instance.local_fields = []
106        instance.local_many_to_many = []
107        instance.related_fkey_lookups = []
108
109        # Cache the instance BEFORE processing fields to prevent recursion
110        self._cache[model] = instance
111
112        # Now process fields - they can safely access cls._model_meta
113        seen_attrs = set()
114        for klass in model.__mro__:
115            for attr_name in list(klass.__dict__.keys()):
116                if attr_name.startswith("_") or attr_name in seen_attrs:
117                    continue
118                seen_attrs.add(attr_name)
119
120                attr_value = klass.__dict__[attr_name]
121
122                if not inspect.isclass(attr_value) and hasattr(
123                    attr_value, "contribute_to_class"
124                ):
125                    if attr_name not in model.__dict__:
126                        field = copy.deepcopy(attr_value)
127                    else:
128                        field = attr_value
129                    field.contribute_to_class(model, attr_name)
130
131        # Set index names now that fields are contributed
132        # Trigger model_options descriptor to ensure it's initialized
133        # (accessing it will cache the instance)
134        for index in model.model_options.indexes:
135            if not index.name:
136                index.set_name_with_model(model)
137
138        return instance
139
140    @property
141    def base_queryset(self) -> QuerySet:
142        """
143        The base queryset is used by Plain's internal operations like cascading
144        deletes, migrations, and related object lookups. It provides access to
145        all objects in the database without any filtering, ensuring Plain can
146        always see the complete dataset when performing framework operations.
147
148        Unlike user-defined querysets which may filter results (e.g. only active
149        objects), the base queryset must never filter out rows to prevent
150        incomplete results in related queries.
151        """
152        return QuerySet.from_model(self.model)
153
154    def add_field(self, field: Field) -> None:
155        # Insert the given field in the order in which it was created, using
156        # the "creation_counter" attribute of the field.
157        # Move many-to-many related fields from self.fields into
158        # self.many_to_many.
159        if field.is_relation and field.many_to_many:
160            bisect.insort(self.local_many_to_many, field)
161        else:
162            bisect.insort(self.local_fields, field)
163
164        # If the field being added is a relation to another known field,
165        # expire the cache on this field and the forward cache on the field
166        # being referenced, because there will be new relationships in the
167        # cache. Otherwise, expire the cache of references *to* this field.
168        # The mechanism for getting at the related model is slightly odd -
169        # ideally, we'd just ask for field.related_model. However, related_model
170        # is a cached property, and all the models haven't been loaded yet, so
171        # we need to make sure we don't cache a string reference.
172        if (
173            field.is_relation
174            and hasattr(field.remote_field, "model")
175            and field.remote_field.model
176        ):
177            try:
178                field.remote_field.model._model_meta._expire_cache(forward=False)
179            except AttributeError:
180                pass
181            self._expire_cache()
182        else:
183            self._expire_cache(reverse=False)
184
185    @cached_property
186    def fields(self) -> ImmutableList:
187        """
188        Return a list of all forward fields on the model and its parents,
189        excluding ManyToManyFields.
190
191        Private API intended only to be used by Plain itself; get_fields()
192        combined with filtering of field properties is the public API for
193        obtaining this field list.
194        """
195
196        # For legacy reasons, the fields property should only contain forward
197        # fields that are not private or with a m2m cardinality. Therefore we
198        # pass these three filters as filters to the generator.
199        # The third lambda is a longwinded way of checking f.related_model - we don't
200        # use that property directly because related_model is a cached property,
201        # and all the models may not have been loaded yet; we don't want to cache
202        # the string reference to the related_model.
203        def is_not_an_m2m_field(f: Any) -> bool:
204            return not (f.is_relation and f.many_to_many)
205
206        def is_not_a_generic_relation(f: Any) -> bool:
207            return not (f.is_relation and f.one_to_many)
208
209        def is_not_a_generic_foreign_key(f: Any) -> bool:
210            return not (
211                f.is_relation
212                and f.many_to_one
213                and not (hasattr(f.remote_field, "model") and f.remote_field.model)
214            )
215
216        return make_immutable_fields_list(
217            "fields",
218            (
219                f
220                for f in self._get_fields(reverse=False)
221                if is_not_an_m2m_field(f)
222                and is_not_a_generic_relation(f)
223                and is_not_a_generic_foreign_key(f)
224            ),
225        )
226
227    @cached_property
228    def concrete_fields(self) -> ImmutableList:
229        """
230        Return a list of all concrete fields on the model and its parents.
231
232        Private API intended only to be used by Plain itself; get_fields()
233        combined with filtering of field properties is the public API for
234        obtaining this field list.
235        """
236        return make_immutable_fields_list(
237            "concrete_fields", (f for f in self.fields if f.concrete)
238        )
239
240    @cached_property
241    def local_concrete_fields(self) -> ImmutableList:
242        """
243        Return a list of all concrete fields on the model.
244
245        Private API intended only to be used by Plain itself; get_fields()
246        combined with filtering of field properties is the public API for
247        obtaining this field list.
248        """
249        return make_immutable_fields_list(
250            "local_concrete_fields", (f for f in self.local_fields if f.concrete)
251        )
252
253    @cached_property
254    def many_to_many(self) -> ImmutableList:
255        """
256        Return a list of all many to many fields on the model and its parents.
257
258        Private API intended only to be used by Plain itself; get_fields()
259        combined with filtering of field properties is the public API for
260        obtaining this list.
261        """
262        return make_immutable_fields_list(
263            "many_to_many",
264            (
265                f
266                for f in self._get_fields(reverse=False)
267                if f.is_relation and f.many_to_many
268            ),
269        )
270
271    @cached_property
272    def related_objects(self) -> ImmutableList:
273        """
274        Return all related objects pointing to the current model. The related
275        objects can come from a one-to-one, one-to-many, or many-to-many field
276        relation type.
277
278        Private API intended only to be used by Plain itself; get_fields()
279        combined with filtering of field properties is the public API for
280        obtaining this field list.
281        """
282        all_related_fields = self._get_fields(
283            forward=False, reverse=True, include_hidden=True
284        )
285        return make_immutable_fields_list(
286            "related_objects",
287            (
288                obj
289                for obj in all_related_fields
290                if not obj.hidden or obj.field.many_to_many
291            ),
292        )
293
294    @cached_property
295    def _forward_fields_map(self) -> dict[str, Any]:
296        res = {}
297        fields = self._get_fields(reverse=False)
298        for field in fields:
299            res[field.name] = field
300            # Due to the way Plain's internals work, get_field() should also
301            # be able to fetch a field by attname. In the case of a concrete
302            # field with relation, includes the *_id name too
303            try:
304                res[field.attname] = field
305            except AttributeError:
306                pass
307        return res
308
309    @cached_property
310    def fields_map(self) -> dict[str, Any]:
311        res = {}
312        fields = self._get_fields(forward=False, include_hidden=True)
313        for field in fields:
314            res[field.name] = field
315            # Due to the way Plain's internals work, get_field() should also
316            # be able to fetch a field by attname. In the case of a concrete
317            # field with relation, includes the *_id name too
318            try:
319                res[field.attname] = field
320            except AttributeError:
321                pass
322        return res
323
324    def get_field(self, field_name: str) -> Any:
325        """
326        Return a field instance given the name of a forward or reverse field.
327        """
328        try:
329            # In order to avoid premature loading of the relation tree
330            # (expensive) we prefer checking if the field is a forward field.
331            return self._forward_fields_map[field_name]
332        except KeyError:
333            # If the app registry is not ready, reverse fields are
334            # unavailable, therefore we throw a FieldDoesNotExist exception.
335            if not self.models_registry.ready:
336                raise FieldDoesNotExist(
337                    f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
338                    "so if this is an auto-created related field, it won't "
339                    "be available yet."
340                )
341
342        try:
343            # Retrieve field instance by name from cached or just-computed
344            # field map.
345            return self.fields_map[field_name]
346        except KeyError:
347            raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
348
349    def _populate_directed_relation_graph(self) -> Any:
350        """
351        This method is used by each model to find its reverse objects. As this
352        method is very expensive and is accessed frequently (it looks up every
353        field in a model, in every app), it is computed on first access and then
354        is set as a property on every model.
355        """
356        related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
357
358        all_models = self.models_registry.get_models()
359        for model in all_models:
360            meta = model._model_meta
361
362            fields_with_relations = (
363                f
364                for f in meta._get_fields(reverse=False)
365                if f.is_relation and f.related_model is not None
366            )
367            for f in fields_with_relations:
368                if not isinstance(f.remote_field.model, str):
369                    remote_label = f.remote_field.model.model_options.label
370                    related_objects_graph[remote_label].append(f)
371
372        for model in all_models:
373            # Set the relation_tree using the internal __dict__. In this way
374            # we avoid calling the cached property. In attribute lookup,
375            # __dict__ takes precedence over a data descriptor (such as
376            # @cached_property). This means that the _model_meta._relation_tree is
377            # only called if related_objects is not in __dict__.
378            related_objects = related_objects_graph[model.model_options.label]
379            model._model_meta.__dict__["_relation_tree"] = related_objects
380        # It seems it is possible that self is not in all_models, so guard
381        # against that with default for get().
382        return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
383
384    @cached_property
385    def _relation_tree(self) -> Any:
386        return self._populate_directed_relation_graph()
387
388    def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
389        # This method is usually called by packages.cache_clear(), when the
390        # registry is finalized, or when a new field is added.
391        if forward:
392            for cache_key in self.FORWARD_PROPERTIES:
393                if cache_key in self.__dict__:
394                    delattr(self, cache_key)
395        if reverse:
396            for cache_key in self.REVERSE_PROPERTIES:
397                if cache_key in self.__dict__:
398                    delattr(self, cache_key)
399        self._get_fields_cache = {}
400
401    def get_fields(self, include_hidden: bool = False) -> ImmutableList:
402        """
403        Return a list of fields associated to the model. By default, include
404        forward and reverse fields, fields derived from inheritance, but not
405        hidden fields. The returned fields can be changed using the parameters:
406
407        - include_hidden:  include fields that have a related_name that
408                           starts with a "+"
409        """
410        return self._get_fields(include_hidden=include_hidden)
411
412    def _get_fields(
413        self,
414        forward: bool = True,
415        reverse: bool = True,
416        include_hidden: bool = False,
417        seen_models: set[type[Any]] | None = None,
418    ) -> ImmutableList:
419        """
420        Internal helper function to return fields of the model.
421        * If forward=True, then fields defined on this model are returned.
422        * If reverse=True, then relations pointing to this model are returned.
423        * If include_hidden=True, then fields with is_hidden=True are returned.
424        """
425
426        # This helper function is used to allow recursion in ``get_fields()``
427        # implementation and to provide a fast way for Plain's internals to
428        # access specific subsets of fields.
429
430        # We must keep track of which models we have already seen. Otherwise we
431        # could include the same field multiple times from different models.
432        topmost_call = seen_models is None
433        if seen_models is None:
434            seen_models = set()
435        seen_models.add(self.model)
436
437        # Creates a cache key composed of all arguments
438        cache_key = (forward, reverse, include_hidden, topmost_call)
439
440        try:
441            # In order to avoid list manipulation. Always return a shallow copy
442            # of the results.
443            return self._get_fields_cache[cache_key]
444        except KeyError:
445            pass
446
447        fields = []
448
449        if reverse:
450            # Tree is computed once and cached until the app cache is expired.
451            # It is composed of a list of fields pointing to the current model
452            # from other models.
453            all_fields = self._relation_tree
454            for field in all_fields:
455                # If hidden fields should be included or the relation is not
456                # intentionally hidden, add to the fields dict.
457                if include_hidden or not field.remote_field.hidden:
458                    fields.append(field.remote_field)
459
460        if forward:
461            fields += self.local_fields
462            fields += self.local_many_to_many
463
464        # In order to avoid list manipulation. Always
465        # return a shallow copy of the results
466        fields = make_immutable_fields_list("get_fields()", fields)
467
468        # Store result into cache for later access
469        self._get_fields_cache[cache_key] = fields
470        return fields
471
472    @cached_property
473    def _property_names(self) -> frozenset[str]:
474        """Return a set of the names of the properties defined on the model."""
475        names = []
476        for name in dir(self.model):
477            attr = inspect.getattr_static(self.model, name)
478            if isinstance(attr, property):
479                names.append(name)
480        return frozenset(names)
481
482    @cached_property
483    def _non_pk_concrete_field_names(self) -> frozenset[str]:
484        """
485        Return a set of the non-primary key concrete field names defined on the model.
486        """
487        names = []
488        for field in self.concrete_fields:
489            if not field.primary_key:
490                names.append(field.name)
491                if field.name != field.attname:
492                    names.append(field.attname)
493        return frozenset(names)
494
495    @cached_property
496    def db_returning_fields(self) -> list[Field]:
497        """
498        Private API intended only to be used by Plain itself.
499        Fields to be returned after a database insert.
500        """
501        return [
502            field
503            for field in self._get_fields(forward=True, reverse=False)
504            if getattr(field, "db_returning", False)
505        ]