Plain is headed towards 1.0! Subscribe for development updates →

  1import bisect
  2import inspect
  3from collections import defaultdict
  4from functools import cached_property
  5
  6from plain.exceptions import FieldDoesNotExist
  7from plain.models import models_registry
  8from plain.models.constraints import UniqueConstraint
  9from plain.models.db import db_connection
 10from plain.models.fields import PrimaryKeyField
 11from plain.models.query import QuerySet
 12from plain.utils.datastructures import ImmutableList
 13
 14PROXY_PARENTS = object()
 15
 16EMPTY_RELATION_TREE = ()
 17
 18IMMUTABLE_WARNING = (
 19    "The return type of '%s' should never be mutated. If you want to manipulate this "
 20    "list for your own use, make a copy first."
 21)
 22
 23DEFAULT_NAMES = (
 24    "db_table",
 25    "db_table_comment",
 26    "queryset_class",
 27    "ordering",
 28    "package_label",
 29    "models_registry",
 30    "required_db_features",
 31    "required_db_vendor",
 32    "indexes",
 33    "constraints",
 34)
 35
 36
 37def make_immutable_fields_list(name, data):
 38    return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
 39
 40
 41class Options:
 42    FORWARD_PROPERTIES = {
 43        "fields",
 44        "many_to_many",
 45        "concrete_fields",
 46        "local_concrete_fields",
 47        "_non_pk_concrete_field_names",
 48        "_forward_fields_map",
 49        "base_queryset",
 50        "queryset",
 51    }
 52    REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
 53
 54    default_models_registry = models_registry
 55
 56    def __init__(self, meta, package_label=None):
 57        self._get_fields_cache = {}
 58        self.local_fields = []
 59        self.local_many_to_many = []
 60        self.queryset_class = None
 61        self.model_name = None
 62        self.db_table = ""
 63        self.db_table_comment = ""
 64        self.ordering = []
 65        self.indexes = []
 66        self.constraints = []
 67        self.object_name = None
 68        self.package_label = package_label
 69        self.required_db_features = []
 70        self.required_db_vendor = None
 71        self.meta = meta
 72
 73        # For any non-abstract class, the concrete class is the model
 74        # in the end of the proxy_for_model chain. In particular, for
 75        # concrete models, the concrete_model is always the class itself.
 76        self.concrete_model = None
 77
 78        # List of all lookups defined in ForeignKey 'limit_choices_to' options
 79        # from *other* models. Needed for some admin checks. Internal use only.
 80        self.related_fkey_lookups = []
 81
 82        # A custom app registry to use, if you're making a separate model set.
 83        self.models_registry = self.default_models_registry
 84
 85    @property
 86    def label(self):
 87        return f"{self.package_label}.{self.object_name}"
 88
 89    @property
 90    def label_lower(self):
 91        return f"{self.package_label}.{self.model_name}"
 92
 93    def contribute_to_class(self, cls, name):
 94        from plain.models.backends.utils import truncate_name
 95
 96        cls._meta = self
 97        self.model = cls
 98        # First, construct the default values for these options.
 99        self.object_name = cls.__name__
100        self.model_name = self.object_name.lower()
101
102        # Store the original user-defined values for each option,
103        # for use when serializing the model definition
104        self.original_attrs = {}
105
106        # Next, apply any overridden values from 'class Meta'.
107        if self.meta:
108            meta_attrs = self.meta.__dict__.copy()
109            for name in self.meta.__dict__:
110                # Ignore any private attributes that Plain doesn't care about.
111                # NOTE: We can't modify a dictionary's contents while looping
112                # over it, so we loop over the *original* dictionary instead.
113                if name.startswith("_"):
114                    del meta_attrs[name]
115            for attr_name in DEFAULT_NAMES:
116                if attr_name in meta_attrs:
117                    setattr(self, attr_name, meta_attrs.pop(attr_name))
118                    self.original_attrs[attr_name] = getattr(self, attr_name)
119                elif hasattr(self.meta, attr_name):
120                    setattr(self, attr_name, getattr(self.meta, attr_name))
121                    self.original_attrs[attr_name] = getattr(self, attr_name)
122
123            # Package label/class name interpolation for names of constraints and
124            # indexes.
125            for attr_name in {"constraints", "indexes"}:
126                objs = getattr(self, attr_name, [])
127                setattr(self, attr_name, self._format_names_with_class(cls, objs))
128
129            # Any leftover attributes must be invalid.
130            if meta_attrs != {}:
131                raise TypeError(
132                    "'class Meta' got invalid attribute(s): {}".format(
133                        ",".join(meta_attrs)
134                    )
135                )
136
137        del self.meta
138
139        # If the db_table wasn't provided, use the package_label + model_name.
140        if not self.db_table:
141            self.db_table = f"{self.package_label}_{self.model_name}"
142            self.db_table = truncate_name(
143                self.db_table,
144                db_connection.ops.max_name_length(),
145            )
146
147    def _format_names_with_class(self, cls, objs):
148        """Package label/class name interpolation for object names."""
149        new_objs = []
150        for obj in objs:
151            obj = obj.clone()
152            obj.name = obj.name % {
153                "package_label": cls._meta.package_label.lower(),
154                "class": cls.__name__.lower(),
155            }
156            new_objs.append(obj)
157        return new_objs
158
159    def _prepare(self, model):
160        if not any(f.name == "id" for f in self.local_fields):
161            model.add_to_class("id", PrimaryKeyField())
162
163    def add_field(self, field, private=False):
164        # Insert the given field in the order in which it was created, using
165        # the "creation_counter" attribute of the field.
166        # Move many-to-many related fields from self.fields into
167        # self.many_to_many.
168        if field.is_relation and field.many_to_many:
169            bisect.insort(self.local_many_to_many, field)
170        else:
171            bisect.insort(self.local_fields, field)
172
173        # If the field being added is a relation to another known field,
174        # expire the cache on this field and the forward cache on the field
175        # being referenced, because there will be new relationships in the
176        # cache. Otherwise, expire the cache of references *to* this field.
177        # The mechanism for getting at the related model is slightly odd -
178        # ideally, we'd just ask for field.related_model. However, related_model
179        # is a cached property, and all the models haven't been loaded yet, so
180        # we need to make sure we don't cache a string reference.
181        if (
182            field.is_relation
183            and hasattr(field.remote_field, "model")
184            and field.remote_field.model
185        ):
186            try:
187                field.remote_field.model._meta._expire_cache(forward=False)
188            except AttributeError:
189                pass
190            self._expire_cache()
191        else:
192            self._expire_cache(reverse=False)
193
194    def __repr__(self):
195        return f"<Options for {self.object_name}>"
196
197    def __str__(self):
198        return self.label_lower
199
200    def can_migrate(self, connection):
201        """
202        Return True if the model can/should be migrated on the given
203        `connection` object.
204        """
205        if self.required_db_vendor:
206            return self.required_db_vendor == connection.vendor
207        if self.required_db_features:
208            return all(
209                getattr(connection.features, feat, False)
210                for feat in self.required_db_features
211            )
212        return True
213
214    @property
215    def base_queryset(self):
216        """
217        The base queryset is used by Plain's internal operations like cascading
218        deletes, migrations, and related object lookups. It provides access to
219        all objects in the database without any filtering, ensuring Plain can
220        always see the complete dataset when performing framework operations.
221
222        Unlike user-defined querysets which may filter results (e.g. only active
223        objects), the base queryset must never filter out rows to prevent
224        incomplete results in related queries.
225        """
226        return QuerySet(model=self.model)
227
228    @property
229    def queryset(self):
230        if self.queryset_class:
231            return self.queryset_class(model=self.model)
232        return QuerySet(model=self.model)
233
234    @cached_property
235    def fields(self):
236        """
237        Return a list of all forward fields on the model and its parents,
238        excluding ManyToManyFields.
239
240        Private API intended only to be used by Plain itself; get_fields()
241        combined with filtering of field properties is the public API for
242        obtaining this field list.
243        """
244
245        # For legacy reasons, the fields property should only contain forward
246        # fields that are not private or with a m2m cardinality. Therefore we
247        # pass these three filters as filters to the generator.
248        # The third lambda is a longwinded way of checking f.related_model - we don't
249        # use that property directly because related_model is a cached property,
250        # and all the models may not have been loaded yet; we don't want to cache
251        # the string reference to the related_model.
252        def is_not_an_m2m_field(f):
253            return not (f.is_relation and f.many_to_many)
254
255        def is_not_a_generic_relation(f):
256            return not (f.is_relation and f.one_to_many)
257
258        def is_not_a_generic_foreign_key(f):
259            return not (
260                f.is_relation
261                and f.many_to_one
262                and not (hasattr(f.remote_field, "model") and f.remote_field.model)
263            )
264
265        return make_immutable_fields_list(
266            "fields",
267            (
268                f
269                for f in self._get_fields(reverse=False)
270                if is_not_an_m2m_field(f)
271                and is_not_a_generic_relation(f)
272                and is_not_a_generic_foreign_key(f)
273            ),
274        )
275
276    @cached_property
277    def concrete_fields(self):
278        """
279        Return a list of all concrete fields on the model and its parents.
280
281        Private API intended only to be used by Plain itself; get_fields()
282        combined with filtering of field properties is the public API for
283        obtaining this field list.
284        """
285        return make_immutable_fields_list(
286            "concrete_fields", (f for f in self.fields if f.concrete)
287        )
288
289    @cached_property
290    def local_concrete_fields(self):
291        """
292        Return a list of all concrete fields on the model.
293
294        Private API intended only to be used by Plain itself; get_fields()
295        combined with filtering of field properties is the public API for
296        obtaining this field list.
297        """
298        return make_immutable_fields_list(
299            "local_concrete_fields", (f for f in self.local_fields if f.concrete)
300        )
301
302    @cached_property
303    def many_to_many(self):
304        """
305        Return a list of all many to many fields on the model and its parents.
306
307        Private API intended only to be used by Plain itself; get_fields()
308        combined with filtering of field properties is the public API for
309        obtaining this list.
310        """
311        return make_immutable_fields_list(
312            "many_to_many",
313            (
314                f
315                for f in self._get_fields(reverse=False)
316                if f.is_relation and f.many_to_many
317            ),
318        )
319
320    @cached_property
321    def related_objects(self):
322        """
323        Return all related objects pointing to the current model. The related
324        objects can come from a one-to-one, one-to-many, or many-to-many field
325        relation type.
326
327        Private API intended only to be used by Plain itself; get_fields()
328        combined with filtering of field properties is the public API for
329        obtaining this field list.
330        """
331        all_related_fields = self._get_fields(
332            forward=False, reverse=True, include_hidden=True
333        )
334        return make_immutable_fields_list(
335            "related_objects",
336            (
337                obj
338                for obj in all_related_fields
339                if not obj.hidden or obj.field.many_to_many
340            ),
341        )
342
343    @cached_property
344    def _forward_fields_map(self):
345        res = {}
346        fields = self._get_fields(reverse=False)
347        for field in fields:
348            res[field.name] = field
349            # Due to the way Plain's internals work, get_field() should also
350            # be able to fetch a field by attname. In the case of a concrete
351            # field with relation, includes the *_id name too
352            try:
353                res[field.attname] = field
354            except AttributeError:
355                pass
356        return res
357
358    @cached_property
359    def fields_map(self):
360        res = {}
361        fields = self._get_fields(forward=False, include_hidden=True)
362        for field in fields:
363            res[field.name] = field
364            # Due to the way Plain's internals work, get_field() should also
365            # be able to fetch a field by attname. In the case of a concrete
366            # field with relation, includes the *_id name too
367            try:
368                res[field.attname] = field
369            except AttributeError:
370                pass
371        return res
372
373    def get_field(self, field_name):
374        """
375        Return a field instance given the name of a forward or reverse field.
376        """
377        try:
378            # In order to avoid premature loading of the relation tree
379            # (expensive) we prefer checking if the field is a forward field.
380            return self._forward_fields_map[field_name]
381        except KeyError:
382            # If the app registry is not ready, reverse fields are
383            # unavailable, therefore we throw a FieldDoesNotExist exception.
384            if not self.models_registry.ready:
385                raise FieldDoesNotExist(
386                    f"{self.object_name} has no field named '{field_name}'. The app cache isn't ready yet, "
387                    "so if this is an auto-created related field, it won't "
388                    "be available yet."
389                )
390
391        try:
392            # Retrieve field instance by name from cached or just-computed
393            # field map.
394            return self.fields_map[field_name]
395        except KeyError:
396            raise FieldDoesNotExist(
397                f"{self.object_name} has no field named '{field_name}'"
398            )
399
400    def _populate_directed_relation_graph(self):
401        """
402        This method is used by each model to find its reverse objects. As this
403        method is very expensive and is accessed frequently (it looks up every
404        field in a model, in every app), it is computed on first access and then
405        is set as a property on every model.
406        """
407        related_objects_graph = defaultdict(list)
408
409        all_models = self.models_registry.get_models()
410        for model in all_models:
411            opts = model._meta
412
413            fields_with_relations = (
414                f
415                for f in opts._get_fields(reverse=False)
416                if f.is_relation and f.related_model is not None
417            )
418            for f in fields_with_relations:
419                if not isinstance(f.remote_field.model, str):
420                    remote_label = f.remote_field.model._meta.concrete_model._meta.label
421                    related_objects_graph[remote_label].append(f)
422
423        for model in all_models:
424            # Set the relation_tree using the internal __dict__. In this way
425            # we avoid calling the cached property. In attribute lookup,
426            # __dict__ takes precedence over a data descriptor (such as
427            # @cached_property). This means that the _meta._relation_tree is
428            # only called if related_objects is not in __dict__.
429            related_objects = related_objects_graph[
430                model._meta.concrete_model._meta.label
431            ]
432            model._meta.__dict__["_relation_tree"] = related_objects
433        # It seems it is possible that self is not in all_models, so guard
434        # against that with default for get().
435        return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
436
437    @cached_property
438    def _relation_tree(self):
439        return self._populate_directed_relation_graph()
440
441    def _expire_cache(self, forward=True, reverse=True):
442        # This method is usually called by packages.cache_clear(), when the
443        # registry is finalized, or when a new field is added.
444        if forward:
445            for cache_key in self.FORWARD_PROPERTIES:
446                if cache_key in self.__dict__:
447                    delattr(self, cache_key)
448        if reverse:
449            for cache_key in self.REVERSE_PROPERTIES:
450                if cache_key in self.__dict__:
451                    delattr(self, cache_key)
452        self._get_fields_cache = {}
453
454    def get_fields(self, include_hidden=False):
455        """
456        Return a list of fields associated to the model. By default, include
457        forward and reverse fields, fields derived from inheritance, but not
458        hidden fields. The returned fields can be changed using the parameters:
459
460        - include_hidden:  include fields that have a related_name that
461                           starts with a "+"
462        """
463        return self._get_fields(include_hidden=include_hidden)
464
465    def _get_fields(
466        self,
467        forward=True,
468        reverse=True,
469        include_hidden=False,
470        seen_models=None,
471    ):
472        """
473        Internal helper function to return fields of the model.
474        * If forward=True, then fields defined on this model are returned.
475        * If reverse=True, then relations pointing to this model are returned.
476        * If include_hidden=True, then fields with is_hidden=True are returned.
477        """
478
479        # This helper function is used to allow recursion in ``get_fields()``
480        # implementation and to provide a fast way for Plain's internals to
481        # access specific subsets of fields.
482
483        # We must keep track of which models we have already seen. Otherwise we
484        # could include the same field multiple times from different models.
485        topmost_call = seen_models is None
486        if topmost_call:
487            seen_models = set()
488        seen_models.add(self.model)
489
490        # Creates a cache key composed of all arguments
491        cache_key = (forward, reverse, include_hidden, topmost_call)
492
493        try:
494            # In order to avoid list manipulation. Always return a shallow copy
495            # of the results.
496            return self._get_fields_cache[cache_key]
497        except KeyError:
498            pass
499
500        fields = []
501
502        if reverse:
503            # Tree is computed once and cached until the app cache is expired.
504            # It is composed of a list of fields pointing to the current model
505            # from other models.
506            all_fields = self._relation_tree
507            for field in all_fields:
508                # If hidden fields should be included or the relation is not
509                # intentionally hidden, add to the fields dict.
510                if include_hidden or not field.remote_field.hidden:
511                    fields.append(field.remote_field)
512
513        if forward:
514            fields += self.local_fields
515            fields += self.local_many_to_many
516
517        # In order to avoid list manipulation. Always
518        # return a shallow copy of the results
519        fields = make_immutable_fields_list("get_fields()", fields)
520
521        # Store result into cache for later access
522        self._get_fields_cache[cache_key] = fields
523        return fields
524
525    @cached_property
526    def total_unique_constraints(self):
527        """
528        Return a list of total unique constraints. Useful for determining set
529        of fields guaranteed to be unique for all rows.
530        """
531        return [
532            constraint
533            for constraint in self.constraints
534            if (
535                isinstance(constraint, UniqueConstraint)
536                and constraint.condition is None
537                and not constraint.contains_expressions
538            )
539        ]
540
541    @cached_property
542    def _property_names(self):
543        """Return a set of the names of the properties defined on the model."""
544        names = []
545        for name in dir(self.model):
546            attr = inspect.getattr_static(self.model, name)
547            if isinstance(attr, property):
548                names.append(name)
549        return frozenset(names)
550
551    @cached_property
552    def _non_pk_concrete_field_names(self):
553        """
554        Return a set of the non-primary key concrete field names defined on the model.
555        """
556        names = []
557        for field in self.concrete_fields:
558            if not field.primary_key:
559                names.append(field.name)
560                if field.name != field.attname:
561                    names.append(field.attname)
562        return frozenset(names)
563
564    @cached_property
565    def db_returning_fields(self):
566        """
567        Private API intended only to be used by Plain itself.
568        Fields to be returned after a database insert.
569        """
570        return [
571            field
572            for field in self._get_fields(forward=True, reverse=False)
573            if getattr(field, "db_returning", False)
574        ]