Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import bisect
  4import copy
  5import inspect
  6from collections import defaultdict
  7from functools import cached_property
  8from typing import TYPE_CHECKING, Any
  9
 10from plain.models.exceptions import FieldDoesNotExist
 11from plain.models.query import QuerySet
 12from plain.models.registry import models_registry as default_models_registry
 13from plain.utils.datastructures import ImmutableList
 14
 15if TYPE_CHECKING:
 16    from plain.models.base import Model
 17    from plain.models.fields import Field
 18
 19EMPTY_RELATION_TREE = ()
 20
 21IMMUTABLE_WARNING = (
 22    "The return type of '%s' should never be mutated. If you want to manipulate this "
 23    "list for your own use, make a copy first."
 24)
 25
 26
 27def make_immutable_fields_list(name: str, data: Any) -> ImmutableList:
 28    return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
 29
 30
 31class Meta:
 32    """
 33    Model metadata descriptor and container.
 34
 35    Acts as both a descriptor (for lazy initialization and access control)
 36    and the actual metadata instance (cached per model class).
 37    """
 38
 39    FORWARD_PROPERTIES = {
 40        "fields",
 41        "many_to_many",
 42        "concrete_fields",
 43        "local_concrete_fields",
 44        "_non_pk_concrete_field_names",
 45        "_forward_fields_map",
 46        "base_queryset",
 47    }
 48    REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
 49
 50    # Type annotations for attributes set in _create_and_cache
 51    # These exist on cached instances, not on the descriptor itself
 52    model: type[Model]
 53    models_registry: Any
 54    _get_fields_cache: dict[Any, Any]
 55    local_fields: list[Field]
 56    local_many_to_many: list[Field]
 57
 58    def __init__(self, models_registry: Any | None = None):
 59        """
 60        Initialize the descriptor with optional configuration.
 61
 62        This is called ONCE when defining the base Model class.
 63        The descriptor then creates cached instances per model subclass.
 64        """
 65        self._models_registry = models_registry
 66        self._cache: dict[type[Model], Meta] = {}
 67
 68    def __get__(self, instance: Any, owner: type[Model]) -> Meta:
 69        """
 70        Descriptor protocol - returns cached Meta instance for the model class.
 71
 72        This is called when accessing Model._model_meta and returns a per-class
 73        cached instance created by _create_and_cache().
 74
 75        Can be accessed from both class and instances:
 76        - MyModel._model_meta (class access)
 77        - my_instance._model_meta (instance access - returns class's metadata)
 78        """
 79        # Allow instance access - just return the class's metadata
 80        if instance is not None:
 81            owner = instance.__class__
 82
 83        # Skip for the base Model class - return descriptor
 84        if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
 85            return self  # type: ignore
 86
 87        # Return cached instance or create new one
 88        if owner not in self._cache:
 89            # Create the instance and cache it BEFORE field contribution
 90            # to avoid infinite recursion when fields access cls._model_meta
 91            return self._create_and_cache(owner)
 92
 93        return self._cache[owner]
 94
 95    def _create_and_cache(self, model: type[Model]) -> Meta:
 96        """Create Meta instance and cache it before field contribution."""
 97        # Create instance without calling __init__
 98        instance = Meta.__new__(Meta)
 99
100        # Initialize basic model-specific state
101        instance.model = model
102        instance.models_registry = self._models_registry or default_models_registry
103        instance._get_fields_cache = {}
104        instance.local_fields = []
105        instance.local_many_to_many = []
106
107        # Cache the instance BEFORE processing fields to prevent recursion
108        self._cache[model] = instance
109
110        # Now process fields - they can safely access cls._model_meta
111        seen_attrs = set()
112        for klass in model.__mro__:
113            for attr_name in list(klass.__dict__.keys()):
114                if attr_name.startswith("_") or attr_name in seen_attrs:
115                    continue
116                seen_attrs.add(attr_name)
117
118                attr_value = klass.__dict__[attr_name]
119
120                if not inspect.isclass(attr_value) and hasattr(
121                    attr_value, "contribute_to_class"
122                ):
123                    if attr_name not in model.__dict__:
124                        field = copy.deepcopy(attr_value)
125                    else:
126                        field = attr_value
127                    field.contribute_to_class(model, attr_name)
128
129        # Set index names now that fields are contributed
130        # Trigger model_options descriptor to ensure it's initialized
131        # (accessing it will cache the instance)
132        for index in model.model_options.indexes:
133            if not index.name:
134                index.set_name_with_model(model)
135
136        return instance
137
138    @property
139    def base_queryset(self) -> QuerySet:
140        """
141        The base queryset is used by Plain's internal operations like cascading
142        deletes, migrations, and related object lookups. It provides access to
143        all objects in the database without any filtering, ensuring Plain can
144        always see the complete dataset when performing framework operations.
145
146        Unlike user-defined querysets which may filter results (e.g. only active
147        objects), the base queryset must never filter out rows to prevent
148        incomplete results in related queries.
149        """
150        return QuerySet.from_model(self.model)
151
152    def add_field(self, field: Field) -> None:
153        # Insert the given field in the order in which it was created, using
154        # the "creation_counter" attribute of the field.
155        # Move many-to-many related fields from self.fields into
156        # self.many_to_many.
157        if field.is_relation and field.many_to_many:
158            bisect.insort(self.local_many_to_many, field)
159        else:
160            bisect.insort(self.local_fields, field)
161
162        # If the field being added is a relation to another known field,
163        # expire the cache on this field and the forward cache on the field
164        # being referenced, because there will be new relationships in the
165        # cache. Otherwise, expire the cache of references *to* this field.
166        # The mechanism for getting at the related model is slightly odd -
167        # ideally, we'd just ask for field.related_model. However, related_model
168        # is a cached property, and all the models haven't been loaded yet, so
169        # we need to make sure we don't cache a string reference.
170        if (
171            field.is_relation
172            and hasattr(field.remote_field, "model")
173            and field.remote_field.model
174        ):
175            try:
176                field.remote_field.model._model_meta._expire_cache(forward=False)
177            except AttributeError:
178                pass
179            self._expire_cache()
180        else:
181            self._expire_cache(reverse=False)
182
183    @cached_property
184    def fields(self) -> ImmutableList:
185        """
186        Return a list of all forward fields on the model and its parents,
187        excluding ManyToManyFields.
188
189        Private API intended only to be used by Plain itself; get_fields()
190        combined with filtering of field properties is the public API for
191        obtaining this field list.
192        """
193
194        # For legacy reasons, the fields property should only contain forward
195        # fields that are not private or with a m2m cardinality. Therefore we
196        # pass these three filters as filters to the generator.
197        # The third lambda is a longwinded way of checking f.related_model - we don't
198        # use that property directly because related_model is a cached property,
199        # and all the models may not have been loaded yet; we don't want to cache
200        # the string reference to the related_model.
201        def is_not_an_m2m_field(f: Any) -> bool:
202            return not (f.is_relation and f.many_to_many)
203
204        def is_not_a_generic_relation(f: Any) -> bool:
205            return not (f.is_relation and f.one_to_many)
206
207        def is_not_a_generic_foreign_key(f: Any) -> bool:
208            return not (
209                f.is_relation
210                and f.many_to_one
211                and not (hasattr(f.remote_field, "model") and f.remote_field.model)
212            )
213
214        return make_immutable_fields_list(
215            "fields",
216            (
217                f
218                for f in self._get_fields(reverse=False)
219                if is_not_an_m2m_field(f)
220                and is_not_a_generic_relation(f)
221                and is_not_a_generic_foreign_key(f)
222            ),
223        )
224
225    @cached_property
226    def concrete_fields(self) -> ImmutableList:
227        """
228        Return a list of all concrete fields on the model and its parents.
229
230        Private API intended only to be used by Plain itself; get_fields()
231        combined with filtering of field properties is the public API for
232        obtaining this field list.
233        """
234        return make_immutable_fields_list(
235            "concrete_fields", (f for f in self.fields if f.concrete)
236        )
237
238    @cached_property
239    def local_concrete_fields(self) -> ImmutableList:
240        """
241        Return a list of all concrete fields on the model.
242
243        Private API intended only to be used by Plain itself; get_fields()
244        combined with filtering of field properties is the public API for
245        obtaining this field list.
246        """
247        return make_immutable_fields_list(
248            "local_concrete_fields", (f for f in self.local_fields if f.concrete)
249        )
250
251    @cached_property
252    def many_to_many(self) -> ImmutableList:
253        """
254        Return a list of all many to many fields on the model and its parents.
255
256        Private API intended only to be used by Plain itself; get_fields()
257        combined with filtering of field properties is the public API for
258        obtaining this list.
259        """
260        return make_immutable_fields_list(
261            "many_to_many",
262            (
263                f
264                for f in self._get_fields(reverse=False)
265                if f.is_relation and f.many_to_many
266            ),
267        )
268
269    @cached_property
270    def related_objects(self) -> ImmutableList:
271        """
272        Return all related objects pointing to the current model. The related
273        objects can come from a one-to-one, one-to-many, or many-to-many field
274        relation type.
275
276        Private API intended only to be used by Plain itself; get_fields()
277        combined with filtering of field properties is the public API for
278        obtaining this field list.
279        """
280        all_related_fields = self._get_fields(forward=False, reverse=True)
281        return make_immutable_fields_list(
282            "related_objects",
283            (obj for obj in all_related_fields if obj.many_to_many or obj.one_to_many),
284        )
285
286    @cached_property
287    def _forward_fields_map(self) -> dict[str, Any]:
288        res = {}
289        fields = self._get_fields(reverse=False)
290        for field in fields:
291            res[field.name] = field
292            # Due to the way Plain's internals work, get_field() should also
293            # be able to fetch a field by attname. In the case of a concrete
294            # field with relation, includes the *_id name too
295            try:
296                res[field.attname] = field
297            except AttributeError:
298                pass
299        return res
300
301    @cached_property
302    def fields_map(self) -> dict[str, Any]:
303        res = {}
304        fields = self._get_fields(forward=False, reverse=True)
305        for field in fields:
306            res[field.name] = field
307            # Due to the way Plain's internals work, get_field() should also
308            # be able to fetch a field by attname. In the case of a concrete
309            # field with relation, includes the *_id name too
310            try:
311                res[field.attname] = field
312            except AttributeError:
313                pass
314        return res
315
316    def get_field(self, field_name: str) -> Any:
317        """
318        Return a field instance given the name of a forward or reverse field.
319        """
320        try:
321            # In order to avoid premature loading of the relation tree
322            # (expensive) we prefer checking if the field is a forward field.
323            return self._forward_fields_map[field_name]
324        except KeyError:
325            # If the app registry is not ready, reverse fields are
326            # unavailable, therefore we throw a FieldDoesNotExist exception.
327            if not self.models_registry.ready:
328                raise FieldDoesNotExist(
329                    f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
330                    "so if this is an auto-created related field, it won't "
331                    "be available yet."
332                )
333
334        try:
335            # Retrieve field instance by name from cached or just-computed
336            # field map.
337            return self.fields_map[field_name]
338        except KeyError:
339            raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
340
341    def _populate_directed_relation_graph(self) -> Any:
342        """
343        This method is used by each model to find its reverse objects. As this
344        method is very expensive and is accessed frequently (it looks up every
345        field in a model, in every app), it is computed on first access and then
346        is set as a property on every model.
347        """
348        related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
349
350        all_models = self.models_registry.get_models()
351        for model in all_models:
352            meta = model._model_meta
353
354            fields_with_relations = (
355                f
356                for f in meta._get_fields(reverse=False)
357                if f.is_relation and f.related_model is not None
358            )
359            for f in fields_with_relations:
360                if not isinstance(f.remote_field.model, str):
361                    remote_label = f.remote_field.model.model_options.label
362                    related_objects_graph[remote_label].append(f)
363
364        for model in all_models:
365            # Set the relation_tree using the internal __dict__. In this way
366            # we avoid calling the cached property. In attribute lookup,
367            # __dict__ takes precedence over a data descriptor (such as
368            # @cached_property). This means that the _model_meta._relation_tree is
369            # only called if related_objects is not in __dict__.
370            related_objects = related_objects_graph[model.model_options.label]
371            model._model_meta.__dict__["_relation_tree"] = related_objects
372        # It seems it is possible that self is not in all_models, so guard
373        # against that with default for get().
374        return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
375
376    @cached_property
377    def _relation_tree(self) -> Any:
378        return self._populate_directed_relation_graph()
379
380    def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
381        # This method is usually called by packages.cache_clear(), when the
382        # registry is finalized, or when a new field is added.
383        if forward:
384            for cache_key in self.FORWARD_PROPERTIES:
385                if cache_key in self.__dict__:
386                    delattr(self, cache_key)
387        if reverse:
388            for cache_key in self.REVERSE_PROPERTIES:
389                if cache_key in self.__dict__:
390                    delattr(self, cache_key)
391        self._get_fields_cache = {}
392
393    def get_fields(self, include_reverse: bool = False) -> ImmutableList:
394        """
395        Return a list of fields associated to the model.
396
397        By default, returns only forward fields (fields explicitly defined on
398        this model). Set include_reverse=True to also include reverse relations
399        (fields from other models that point to this model).
400
401        Args:
402            include_reverse: Include reverse relation fields (fields from other
403                           models pointing to this model). Needed for framework
404                           operations like migrations and deletion cascading.
405        """
406        return self._get_fields(reverse=include_reverse)
407
408    def _get_fields(
409        self,
410        *,
411        forward: bool = True,
412        reverse: bool = True,
413        seen_models: set[type[Any]] | None = None,
414    ) -> ImmutableList:
415        """
416        Internal helper function to return fields of the model.
417
418        Args:
419            forward: If True, fields defined on this model are returned.
420            reverse: If True, reverse relations (fields from other models
421                    pointing to this model) are returned.
422            seen_models: Track visited models to prevent duplicates in recursion.
423        """
424
425        # This helper function is used to allow recursion in ``get_fields()``
426        # implementation and to provide a fast way for Plain's internals to
427        # access specific subsets of fields.
428
429        # We must keep track of which models we have already seen. Otherwise we
430        # could include the same field multiple times from different models.
431        topmost_call = seen_models is None
432        if seen_models is None:
433            seen_models = set()
434        seen_models.add(self.model)
435
436        # Creates a cache key composed of all arguments
437        cache_key = (forward, reverse, topmost_call)
438
439        try:
440            # In order to avoid list manipulation. Always return a shallow copy
441            # of the results.
442            return self._get_fields_cache[cache_key]
443        except KeyError:
444            pass
445
446        fields = []
447
448        if reverse:
449            # Tree is computed once and cached until the app cache is expired.
450            # It is composed of a list of fields from other models pointing to
451            # the current model (reverse relations).
452            all_fields = self._relation_tree
453            for field in all_fields:
454                fields.append(field.remote_field)
455
456        if forward:
457            fields += self.local_fields
458            fields += self.local_many_to_many
459
460        # In order to avoid list manipulation. Always
461        # return a shallow copy of the results
462        fields = make_immutable_fields_list("get_fields()", fields)
463
464        # Store result into cache for later access
465        self._get_fields_cache[cache_key] = fields
466        return fields
467
468    @cached_property
469    def _property_names(self) -> frozenset[str]:
470        """Return a set of the names of the properties defined on the model."""
471        names = []
472        for name in dir(self.model):
473            attr = inspect.getattr_static(self.model, name)
474            if isinstance(attr, property):
475                names.append(name)
476        return frozenset(names)
477
478    @cached_property
479    def _non_pk_concrete_field_names(self) -> frozenset[str]:
480        """
481        Return a set of the non-primary key concrete field names defined on the model.
482        """
483        names = []
484        for field in self.concrete_fields:
485            if not field.primary_key:
486                names.append(field.name)
487                if field.name != field.attname:
488                    names.append(field.attname)
489        return frozenset(names)
490
491    @cached_property
492    def db_returning_fields(self) -> list[Field]:
493        """
494        Private API intended only to be used by Plain itself.
495        Fields to be returned after a database insert.
496        """
497        return [
498            field
499            for field in self._get_fields(forward=True, reverse=False)
500            if getattr(field, "db_returning", False)
501        ]