1from __future__ import annotations
2
3import bisect
4import copy
5import inspect
6from collections import defaultdict
7from functools import cached_property
8from typing import TYPE_CHECKING, Any
9
10from plain.models.exceptions import FieldDoesNotExist
11from plain.models.query import QuerySet
12from plain.models.registry import models_registry as default_models_registry
13from plain.utils.datastructures import ImmutableList
14
15if TYPE_CHECKING:
16 from plain.models.base import Model
17 from plain.models.fields import Field
18
19EMPTY_RELATION_TREE = ()
20
21IMMUTABLE_WARNING = (
22 "The return type of '%s' should never be mutated. If you want to manipulate this "
23 "list for your own use, make a copy first."
24)
25
26
27def make_immutable_fields_list(name: str, data: Any) -> ImmutableList:
28 return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
29
30
31class Meta:
32 """
33 Model metadata descriptor and container.
34
35 Acts as both a descriptor (for lazy initialization and access control)
36 and the actual metadata instance (cached per model class).
37 """
38
39 FORWARD_PROPERTIES = {
40 "fields",
41 "many_to_many",
42 "concrete_fields",
43 "local_concrete_fields",
44 "_non_pk_concrete_field_names",
45 "_forward_fields_map",
46 "base_queryset",
47 }
48 REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
49
50 # Type annotations for attributes set in _create_and_cache
51 # These exist on cached instances, not on the descriptor itself
52 model: type[Model]
53 models_registry: Any
54 _get_fields_cache: dict[Any, Any]
55 local_fields: list[Field]
56 local_many_to_many: list[Field]
57 related_fkey_lookups: list[Any]
58
59 def __init__(self, models_registry: Any | None = None):
60 """
61 Initialize the descriptor with optional configuration.
62
63 This is called ONCE when defining the base Model class.
64 The descriptor then creates cached instances per model subclass.
65 """
66 self._models_registry = models_registry
67 self._cache: dict[type[Model], Meta] = {}
68
69 def __get__(self, instance: Any, owner: type[Model]) -> Meta:
70 """
71 Descriptor protocol - returns cached Meta instance for the model class.
72
73 This is called when accessing Model._model_meta and returns a per-class
74 cached instance created by _create_and_cache().
75
76 Can be accessed from both class and instances:
77 - MyModel._model_meta (class access)
78 - my_instance._model_meta (instance access - returns class's metadata)
79 """
80 # Allow instance access - just return the class's metadata
81 if instance is not None:
82 owner = instance.__class__
83
84 # Skip for the base Model class - return descriptor
85 if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
86 return self # type: ignore
87
88 # Return cached instance or create new one
89 if owner not in self._cache:
90 # Create the instance and cache it BEFORE field contribution
91 # to avoid infinite recursion when fields access cls._model_meta
92 return self._create_and_cache(owner)
93
94 return self._cache[owner]
95
96 def _create_and_cache(self, model: type[Model]) -> Meta:
97 """Create Meta instance and cache it before field contribution."""
98 # Create instance without calling __init__
99 instance = Meta.__new__(Meta)
100
101 # Initialize basic model-specific state
102 instance.model = model
103 instance.models_registry = self._models_registry or default_models_registry
104 instance._get_fields_cache = {}
105 instance.local_fields = []
106 instance.local_many_to_many = []
107 instance.related_fkey_lookups = []
108
109 # Cache the instance BEFORE processing fields to prevent recursion
110 self._cache[model] = instance
111
112 # Now process fields - they can safely access cls._model_meta
113 seen_attrs = set()
114 for klass in model.__mro__:
115 for attr_name in list(klass.__dict__.keys()):
116 if attr_name.startswith("_") or attr_name in seen_attrs:
117 continue
118 seen_attrs.add(attr_name)
119
120 attr_value = klass.__dict__[attr_name]
121
122 if not inspect.isclass(attr_value) and hasattr(
123 attr_value, "contribute_to_class"
124 ):
125 if attr_name not in model.__dict__:
126 field = copy.deepcopy(attr_value)
127 else:
128 field = attr_value
129 field.contribute_to_class(model, attr_name)
130
131 # Set index names now that fields are contributed
132 # Trigger model_options descriptor to ensure it's initialized
133 # (accessing it will cache the instance)
134 for index in model.model_options.indexes:
135 if not index.name:
136 index.set_name_with_model(model)
137
138 return instance
139
140 @property
141 def base_queryset(self) -> QuerySet:
142 """
143 The base queryset is used by Plain's internal operations like cascading
144 deletes, migrations, and related object lookups. It provides access to
145 all objects in the database without any filtering, ensuring Plain can
146 always see the complete dataset when performing framework operations.
147
148 Unlike user-defined querysets which may filter results (e.g. only active
149 objects), the base queryset must never filter out rows to prevent
150 incomplete results in related queries.
151 """
152 return QuerySet.from_model(self.model)
153
154 def add_field(self, field: Field) -> None:
155 # Insert the given field in the order in which it was created, using
156 # the "creation_counter" attribute of the field.
157 # Move many-to-many related fields from self.fields into
158 # self.many_to_many.
159 if field.is_relation and field.many_to_many:
160 bisect.insort(self.local_many_to_many, field)
161 else:
162 bisect.insort(self.local_fields, field)
163
164 # If the field being added is a relation to another known field,
165 # expire the cache on this field and the forward cache on the field
166 # being referenced, because there will be new relationships in the
167 # cache. Otherwise, expire the cache of references *to* this field.
168 # The mechanism for getting at the related model is slightly odd -
169 # ideally, we'd just ask for field.related_model. However, related_model
170 # is a cached property, and all the models haven't been loaded yet, so
171 # we need to make sure we don't cache a string reference.
172 if (
173 field.is_relation
174 and hasattr(field.remote_field, "model")
175 and field.remote_field.model
176 ):
177 try:
178 field.remote_field.model._model_meta._expire_cache(forward=False)
179 except AttributeError:
180 pass
181 self._expire_cache()
182 else:
183 self._expire_cache(reverse=False)
184
185 @cached_property
186 def fields(self) -> ImmutableList:
187 """
188 Return a list of all forward fields on the model and its parents,
189 excluding ManyToManyFields.
190
191 Private API intended only to be used by Plain itself; get_fields()
192 combined with filtering of field properties is the public API for
193 obtaining this field list.
194 """
195
196 # For legacy reasons, the fields property should only contain forward
197 # fields that are not private or with a m2m cardinality. Therefore we
198 # pass these three filters as filters to the generator.
199 # The third lambda is a longwinded way of checking f.related_model - we don't
200 # use that property directly because related_model is a cached property,
201 # and all the models may not have been loaded yet; we don't want to cache
202 # the string reference to the related_model.
203 def is_not_an_m2m_field(f: Any) -> bool:
204 return not (f.is_relation and f.many_to_many)
205
206 def is_not_a_generic_relation(f: Any) -> bool:
207 return not (f.is_relation and f.one_to_many)
208
209 def is_not_a_generic_foreign_key(f: Any) -> bool:
210 return not (
211 f.is_relation
212 and f.many_to_one
213 and not (hasattr(f.remote_field, "model") and f.remote_field.model)
214 )
215
216 return make_immutable_fields_list(
217 "fields",
218 (
219 f
220 for f in self._get_fields(reverse=False)
221 if is_not_an_m2m_field(f)
222 and is_not_a_generic_relation(f)
223 and is_not_a_generic_foreign_key(f)
224 ),
225 )
226
227 @cached_property
228 def concrete_fields(self) -> ImmutableList:
229 """
230 Return a list of all concrete fields on the model and its parents.
231
232 Private API intended only to be used by Plain itself; get_fields()
233 combined with filtering of field properties is the public API for
234 obtaining this field list.
235 """
236 return make_immutable_fields_list(
237 "concrete_fields", (f for f in self.fields if f.concrete)
238 )
239
240 @cached_property
241 def local_concrete_fields(self) -> ImmutableList:
242 """
243 Return a list of all concrete fields on the model.
244
245 Private API intended only to be used by Plain itself; get_fields()
246 combined with filtering of field properties is the public API for
247 obtaining this field list.
248 """
249 return make_immutable_fields_list(
250 "local_concrete_fields", (f for f in self.local_fields if f.concrete)
251 )
252
253 @cached_property
254 def many_to_many(self) -> ImmutableList:
255 """
256 Return a list of all many to many fields on the model and its parents.
257
258 Private API intended only to be used by Plain itself; get_fields()
259 combined with filtering of field properties is the public API for
260 obtaining this list.
261 """
262 return make_immutable_fields_list(
263 "many_to_many",
264 (
265 f
266 for f in self._get_fields(reverse=False)
267 if f.is_relation and f.many_to_many
268 ),
269 )
270
271 @cached_property
272 def related_objects(self) -> ImmutableList:
273 """
274 Return all related objects pointing to the current model. The related
275 objects can come from a one-to-one, one-to-many, or many-to-many field
276 relation type.
277
278 Private API intended only to be used by Plain itself; get_fields()
279 combined with filtering of field properties is the public API for
280 obtaining this field list.
281 """
282 all_related_fields = self._get_fields(
283 forward=False, reverse=True, include_hidden=True
284 )
285 return make_immutable_fields_list(
286 "related_objects",
287 (
288 obj
289 for obj in all_related_fields
290 if not obj.hidden or obj.field.many_to_many
291 ),
292 )
293
294 @cached_property
295 def _forward_fields_map(self) -> dict[str, Any]:
296 res = {}
297 fields = self._get_fields(reverse=False)
298 for field in fields:
299 res[field.name] = field
300 # Due to the way Plain's internals work, get_field() should also
301 # be able to fetch a field by attname. In the case of a concrete
302 # field with relation, includes the *_id name too
303 try:
304 res[field.attname] = field
305 except AttributeError:
306 pass
307 return res
308
309 @cached_property
310 def fields_map(self) -> dict[str, Any]:
311 res = {}
312 fields = self._get_fields(forward=False, include_hidden=True)
313 for field in fields:
314 res[field.name] = field
315 # Due to the way Plain's internals work, get_field() should also
316 # be able to fetch a field by attname. In the case of a concrete
317 # field with relation, includes the *_id name too
318 try:
319 res[field.attname] = field
320 except AttributeError:
321 pass
322 return res
323
324 def get_field(self, field_name: str) -> Any:
325 """
326 Return a field instance given the name of a forward or reverse field.
327 """
328 try:
329 # In order to avoid premature loading of the relation tree
330 # (expensive) we prefer checking if the field is a forward field.
331 return self._forward_fields_map[field_name]
332 except KeyError:
333 # If the app registry is not ready, reverse fields are
334 # unavailable, therefore we throw a FieldDoesNotExist exception.
335 if not self.models_registry.ready:
336 raise FieldDoesNotExist(
337 f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
338 "so if this is an auto-created related field, it won't "
339 "be available yet."
340 )
341
342 try:
343 # Retrieve field instance by name from cached or just-computed
344 # field map.
345 return self.fields_map[field_name]
346 except KeyError:
347 raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
348
349 def _populate_directed_relation_graph(self) -> Any:
350 """
351 This method is used by each model to find its reverse objects. As this
352 method is very expensive and is accessed frequently (it looks up every
353 field in a model, in every app), it is computed on first access and then
354 is set as a property on every model.
355 """
356 related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
357
358 all_models = self.models_registry.get_models()
359 for model in all_models:
360 meta = model._model_meta
361
362 fields_with_relations = (
363 f
364 for f in meta._get_fields(reverse=False)
365 if f.is_relation and f.related_model is not None
366 )
367 for f in fields_with_relations:
368 if not isinstance(f.remote_field.model, str):
369 remote_label = f.remote_field.model.model_options.label
370 related_objects_graph[remote_label].append(f)
371
372 for model in all_models:
373 # Set the relation_tree using the internal __dict__. In this way
374 # we avoid calling the cached property. In attribute lookup,
375 # __dict__ takes precedence over a data descriptor (such as
376 # @cached_property). This means that the _model_meta._relation_tree is
377 # only called if related_objects is not in __dict__.
378 related_objects = related_objects_graph[model.model_options.label]
379 model._model_meta.__dict__["_relation_tree"] = related_objects
380 # It seems it is possible that self is not in all_models, so guard
381 # against that with default for get().
382 return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
383
384 @cached_property
385 def _relation_tree(self) -> Any:
386 return self._populate_directed_relation_graph()
387
388 def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
389 # This method is usually called by packages.cache_clear(), when the
390 # registry is finalized, or when a new field is added.
391 if forward:
392 for cache_key in self.FORWARD_PROPERTIES:
393 if cache_key in self.__dict__:
394 delattr(self, cache_key)
395 if reverse:
396 for cache_key in self.REVERSE_PROPERTIES:
397 if cache_key in self.__dict__:
398 delattr(self, cache_key)
399 self._get_fields_cache = {}
400
401 def get_fields(self, include_hidden: bool = False) -> ImmutableList:
402 """
403 Return a list of fields associated to the model. By default, include
404 forward and reverse fields, fields derived from inheritance, but not
405 hidden fields. The returned fields can be changed using the parameters:
406
407 - include_hidden: include fields that have a related_name that
408 starts with a "+"
409 """
410 return self._get_fields(include_hidden=include_hidden)
411
412 def _get_fields(
413 self,
414 forward: bool = True,
415 reverse: bool = True,
416 include_hidden: bool = False,
417 seen_models: set[type[Any]] | None = None,
418 ) -> ImmutableList:
419 """
420 Internal helper function to return fields of the model.
421 * If forward=True, then fields defined on this model are returned.
422 * If reverse=True, then relations pointing to this model are returned.
423 * If include_hidden=True, then fields with is_hidden=True are returned.
424 """
425
426 # This helper function is used to allow recursion in ``get_fields()``
427 # implementation and to provide a fast way for Plain's internals to
428 # access specific subsets of fields.
429
430 # We must keep track of which models we have already seen. Otherwise we
431 # could include the same field multiple times from different models.
432 topmost_call = seen_models is None
433 if seen_models is None:
434 seen_models = set()
435 seen_models.add(self.model)
436
437 # Creates a cache key composed of all arguments
438 cache_key = (forward, reverse, include_hidden, topmost_call)
439
440 try:
441 # In order to avoid list manipulation. Always return a shallow copy
442 # of the results.
443 return self._get_fields_cache[cache_key]
444 except KeyError:
445 pass
446
447 fields = []
448
449 if reverse:
450 # Tree is computed once and cached until the app cache is expired.
451 # It is composed of a list of fields pointing to the current model
452 # from other models.
453 all_fields = self._relation_tree
454 for field in all_fields:
455 # If hidden fields should be included or the relation is not
456 # intentionally hidden, add to the fields dict.
457 if include_hidden or not field.remote_field.hidden:
458 fields.append(field.remote_field)
459
460 if forward:
461 fields += self.local_fields
462 fields += self.local_many_to_many
463
464 # In order to avoid list manipulation. Always
465 # return a shallow copy of the results
466 fields = make_immutable_fields_list("get_fields()", fields)
467
468 # Store result into cache for later access
469 self._get_fields_cache[cache_key] = fields
470 return fields
471
472 @cached_property
473 def _property_names(self) -> frozenset[str]:
474 """Return a set of the names of the properties defined on the model."""
475 names = []
476 for name in dir(self.model):
477 attr = inspect.getattr_static(self.model, name)
478 if isinstance(attr, property):
479 names.append(name)
480 return frozenset(names)
481
482 @cached_property
483 def _non_pk_concrete_field_names(self) -> frozenset[str]:
484 """
485 Return a set of the non-primary key concrete field names defined on the model.
486 """
487 names = []
488 for field in self.concrete_fields:
489 if not field.primary_key:
490 names.append(field.name)
491 if field.name != field.attname:
492 names.append(field.attname)
493 return frozenset(names)
494
495 @cached_property
496 def db_returning_fields(self) -> list[Field]:
497 """
498 Private API intended only to be used by Plain itself.
499 Fields to be returned after a database insert.
500 """
501 return [
502 field
503 for field in self._get_fields(forward=True, reverse=False)
504 if getattr(field, "db_returning", False)
505 ]