1from __future__ import annotations
2
3import copy
4import inspect
5from collections import defaultdict
6from collections.abc import Iterable
7from functools import cached_property
8from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
9
10from plain.models.exceptions import FieldDoesNotExist
11from plain.models.query import QuerySet
12from plain.models.registry import models_registry as default_models_registry
13from plain.utils.datastructures import ImmutableList
14
15if TYPE_CHECKING:
16 from plain.models.base import Model
17 from plain.models.fields import Field
18 from plain.models.fields.related import ManyToManyField
19 from plain.models.fields.reverse_related import ForeignObjectRel
20
21EMPTY_RELATION_TREE = ()
22
23IMMUTABLE_WARNING = (
24 "The return type of '%s' should never be mutated. If you want to manipulate this "
25 "list for your own use, make a copy first."
26)
27
28T = TypeVar("T")
29
30
31def make_immutable_fields_list(name: str, data: Iterable[T]) -> ImmutableList[T]:
32 return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
33
34
35class Meta:
36 """
37 Model metadata descriptor and container.
38
39 Acts as both a descriptor (for lazy initialization and access control)
40 and the actual metadata instance (cached per model class).
41 """
42
43 FORWARD_PROPERTIES = {
44 "fields",
45 "many_to_many",
46 "concrete_fields",
47 "local_concrete_fields",
48 "_non_pk_concrete_field_names",
49 "_forward_fields_map",
50 "base_queryset",
51 }
52 REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
53
54 # Type annotations for attributes set in _create_and_cache
55 # These exist on cached instances, not on the descriptor itself
56 model: type[Model]
57 models_registry: Any
58 _get_fields_cache: dict[Any, Any]
59 local_fields: list[Field]
60 local_many_to_many: list[ManyToManyField]
61
62 def __init__(self, models_registry: Any | None = None):
63 """
64 Initialize the descriptor with optional configuration.
65
66 This is called ONCE when defining the base Model class.
67 The descriptor then creates cached instances per model subclass.
68 """
69 self._models_registry = models_registry
70 self._cache: dict[type[Model], Meta] = {}
71
72 def __get__(self, instance: Any, owner: type[Model]) -> Meta:
73 """
74 Descriptor protocol - returns cached Meta instance for the model class.
75
76 This is called when accessing Model._model_meta and returns a per-class
77 cached instance created by _create_and_cache().
78
79 Can be accessed from both class and instances:
80 - MyModel._model_meta (class access)
81 - my_instance._model_meta (instance access - returns class's metadata)
82 """
83 # Allow instance access - just return the class's metadata
84 if instance is not None:
85 owner = instance.__class__
86
87 # Skip for the base Model class - return descriptor
88 if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
89 return self
90
91 # Return cached instance or create new one
92 if owner not in self._cache:
93 # Create the instance and cache it BEFORE field contribution
94 # to avoid infinite recursion when fields access cls._model_meta
95 return self._create_and_cache(owner)
96
97 return self._cache[owner]
98
99 def _create_and_cache(self, model: type[Model]) -> Meta:
100 """Create Meta instance and cache it before field contribution."""
101 # Create instance without calling __init__
102 instance = Meta.__new__(Meta)
103
104 # Initialize basic model-specific state
105 instance.model = model
106 instance.models_registry = self._models_registry or default_models_registry
107 instance._get_fields_cache = {}
108 instance.local_fields = []
109 instance.local_many_to_many = []
110
111 # Cache the instance BEFORE processing fields to prevent recursion
112 self._cache[model] = instance
113
114 # Now process fields - they can safely access cls._model_meta
115 seen_attrs = set()
116 for klass in model.__mro__:
117 for attr_name in list(klass.__dict__.keys()):
118 if attr_name.startswith("_") or attr_name in seen_attrs:
119 continue
120 seen_attrs.add(attr_name)
121
122 attr_value = klass.__dict__[attr_name]
123
124 if not inspect.isclass(attr_value) and hasattr(
125 attr_value, "contribute_to_class"
126 ):
127 if attr_name not in model.__dict__:
128 field = copy.deepcopy(attr_value)
129 else:
130 field = attr_value
131 field.contribute_to_class(model, attr_name)
132
133 # Sort fields: primary key first, then alphabetically by name
134 instance.local_fields.sort(key=lambda f: (not f.primary_key, f.name))
135 instance.local_many_to_many.sort(key=lambda f: f.name)
136
137 # Set index names now that fields are contributed
138 # Trigger model_options descriptor to ensure it's initialized
139 # (accessing it will cache the instance)
140 for index in model.model_options.indexes:
141 if not index.name:
142 index.set_name_with_model(model)
143
144 return instance
145
146 @property
147 def base_queryset(self) -> QuerySet:
148 """
149 The base queryset is used by Plain's internal operations like cascading
150 deletes, migrations, and related object lookups. It provides access to
151 all objects in the database without any filtering, ensuring Plain can
152 always see the complete dataset when performing framework operations.
153
154 Unlike user-defined querysets which may filter results (e.g. only active
155 objects), the base queryset must never filter out rows to prevent
156 incomplete results in related queries.
157 """
158 return QuerySet.from_model(self.model)
159
160 def add_field(self, field: Field) -> None:
161 from plain.models.fields.related import ManyToManyField, RelatedField
162
163 if isinstance(field, ManyToManyField):
164 self.local_many_to_many.append(field)
165 else:
166 self.local_fields.append(field)
167
168 # If the field being added is a relation to another known field,
169 # expire the cache on this field and the forward cache on the field
170 # being referenced, because there will be new relationships in the
171 # cache. Otherwise, expire the cache of references *to* this field.
172 # The mechanism for getting at the related model is slightly odd -
173 # ideally, we'd just ask for field.related_model. However, related_model
174 # is a cached property, and all the models haven't been loaded yet, so
175 # we need to make sure we don't cache a string reference.
176 if isinstance(field, RelatedField) and field.remote_field.model:
177 try:
178 field.remote_field.model._model_meta._expire_cache(forward=False)
179 except AttributeError:
180 pass
181 self._expire_cache()
182 else:
183 self._expire_cache(reverse=False)
184
185 @cached_property
186 def fields(self) -> ImmutableList[Field]:
187 from plain.models.fields.related import RelatedField
188
189 """
190 Return a list of all forward fields on the model and its parents,
191 excluding ManyToManyFields.
192
193 Private API intended only to be used by Plain itself; get_fields()
194 combined with filtering of field properties is the public API for
195 obtaining this field list.
196 """
197
198 # For legacy reasons, the fields property should only contain forward
199 # fields that are not private or with a m2m cardinality.
200 def is_not_an_m2m_field(f: Any) -> bool:
201 from plain.models.fields.related import ManyToManyField
202
203 return not isinstance(f, ManyToManyField)
204
205 def is_not_a_generic_relation(f: Any) -> bool:
206 from plain.models.fields.related import ForeignKeyField, ManyToManyField
207
208 # Only ForeignKeyField and ManyToManyField are valid RelatedFields
209 # Anything else is a generic relation
210 if not isinstance(f, RelatedField):
211 return True
212 return isinstance(f, ForeignKeyField | ManyToManyField)
213
214 return make_immutable_fields_list(
215 "fields",
216 (
217 f
218 for f in self._get_fields(reverse=False)
219 if is_not_an_m2m_field(f) and is_not_a_generic_relation(f)
220 ),
221 )
222
223 @cached_property
224 def concrete_fields(self) -> ImmutableList[Field]:
225 """
226 Return a list of all concrete fields on the model and its parents.
227
228 Private API intended only to be used by Plain itself; get_fields()
229 combined with filtering of field properties is the public API for
230 obtaining this field list.
231 """
232 return make_immutable_fields_list(
233 "concrete_fields", (f for f in self.fields if f.concrete)
234 )
235
236 @cached_property
237 def local_concrete_fields(self) -> ImmutableList[Field]:
238 """
239 Return a list of all concrete fields on the model.
240
241 Private API intended only to be used by Plain itself; get_fields()
242 combined with filtering of field properties is the public API for
243 obtaining this field list.
244 """
245 return make_immutable_fields_list(
246 "local_concrete_fields", (f for f in self.local_fields if f.concrete)
247 )
248
249 @cached_property
250 def many_to_many(self) -> ImmutableList[Field]:
251 """
252 Return a list of all many to many fields on the model and its parents.
253
254 Private API intended only to be used by Plain itself; get_fields()
255 combined with filtering of field properties is the public API for
256 obtaining this list.
257 """
258 from plain.models.fields.related import ManyToManyField
259
260 return make_immutable_fields_list(
261 "many_to_many",
262 (
263 f
264 for f in self._get_fields(reverse=False)
265 if isinstance(f, ManyToManyField)
266 ),
267 )
268
269 @cached_property
270 def related_objects(self) -> ImmutableList[ForeignObjectRel]:
271 """
272 Return all related objects pointing to the current model. The related
273 objects can come from a one-to-one, one-to-many, or many-to-many field
274 relation type.
275
276 Private API intended only to be used by Plain itself; get_fields()
277 combined with filtering of field properties is the public API for
278 obtaining this field list.
279 """
280 from plain.models.fields.reverse_related import ForeignKeyRel, ManyToManyRel
281
282 all_related_fields = self._get_fields(forward=False, reverse=True)
283 return make_immutable_fields_list(
284 "related_objects",
285 (
286 obj
287 for obj in all_related_fields
288 if isinstance(obj, ManyToManyRel | ForeignKeyRel)
289 ),
290 )
291
292 @cached_property
293 def _forward_fields_map(self) -> dict[str, Field]:
294 res = {}
295 fields = self._get_fields(reverse=False)
296 for field in fields:
297 res[field.name] = field
298 # Due to the way Plain's internals work, get_field() should also
299 # be able to fetch a field by attname. In the case of a concrete
300 # field with relation, includes the *_id name too
301 try:
302 res[field.attname] = field
303 except AttributeError:
304 pass
305 return res
306
307 @cached_property
308 def fields_map(self) -> dict[str, Field | ForeignObjectRel]:
309 res = {}
310 fields = self._get_fields(forward=False, reverse=True)
311 for field in fields:
312 res[field.name] = field
313 # Due to the way Plain's internals work, get_field() should also
314 # be able to fetch a field by attname. In the case of a concrete
315 # field with relation, includes the *_id name too
316 try:
317 res[field.attname] = field
318 except AttributeError:
319 pass
320 return res
321
322 def get_field(self, field_name: str) -> Field | ForeignObjectRel:
323 """
324 Return a field instance given the name of a forward or reverse field.
325 """
326 try:
327 # In order to avoid premature loading of the relation tree
328 # (expensive) we prefer checking if the field is a forward field.
329 return self._forward_fields_map[field_name]
330 except KeyError:
331 # If the app registry is not ready, reverse fields are
332 # unavailable, therefore we throw a FieldDoesNotExist exception.
333 if not self.models_registry.ready:
334 raise FieldDoesNotExist(
335 f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
336 "so if this is an auto-created related field, it won't "
337 "be available yet."
338 )
339
340 try:
341 # Retrieve field instance by name from cached or just-computed
342 # field map.
343 return self.fields_map[field_name]
344 except KeyError:
345 raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
346
347 def get_forward_field(self, field_name: str) -> Field:
348 """
349 Return a forward field instance given the field name.
350
351 Raises FieldDoesNotExist if the field doesn't exist or is a reverse relation.
352 """
353 try:
354 return self._forward_fields_map[field_name]
355 except KeyError:
356 raise FieldDoesNotExist(
357 f"{self.model} has no forward field named '{field_name}'"
358 )
359
360 def get_reverse_relation(self, field_name: str) -> ForeignObjectRel:
361 """
362 Return a reverse relation instance given the field name.
363
364 Raises FieldDoesNotExist if the field doesn't exist or is a forward field.
365 """
366 # If the app registry is not ready, reverse fields are unavailable
367 if not self.models_registry.ready:
368 raise FieldDoesNotExist(
369 f"{self.model} has no reverse relation named '{field_name}'. The app cache isn't ready yet."
370 )
371
372 # Check if it's a forward field first
373 if field_name in self._forward_fields_map:
374 raise FieldDoesNotExist(
375 f"'{field_name}' is a forward field, not a reverse relation"
376 )
377
378 try:
379 return self.fields_map[field_name]
380 except KeyError:
381 raise FieldDoesNotExist(
382 f"{self.model} has no reverse relation named '{field_name}'"
383 )
384
385 def _populate_directed_relation_graph(self) -> list[Field]:
386 from plain.models.fields.related import RelatedField
387
388 """
389 This method is used by each model to find its reverse objects. As this
390 method is very expensive and is accessed frequently (it looks up every
391 field in a model, in every app), it is computed on first access and then
392 is set as a property on every model.
393 """
394 related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
395
396 all_models = self.models_registry.get_models()
397 for model in all_models:
398 meta = model._model_meta
399
400 fields_with_relations = (
401 f
402 for f in meta._get_fields(reverse=False)
403 if isinstance(f, RelatedField)
404 )
405 for f in fields_with_relations:
406 if not isinstance(f.remote_field.model, str):
407 remote_label = f.remote_field.model.model_options.label
408 related_objects_graph[remote_label].append(f)
409
410 for model in all_models:
411 # Set the relation_tree using the internal __dict__. In this way
412 # we avoid calling the cached property. In attribute lookup,
413 # __dict__ takes precedence over a data descriptor (such as
414 # @cached_property). This means that the _model_meta._relation_tree is
415 # only called if related_objects is not in __dict__.
416 related_objects = related_objects_graph[model.model_options.label]
417 model._model_meta.__dict__["_relation_tree"] = related_objects
418 # It seems it is possible that self is not in all_models, so guard
419 # against that with default for get().
420 return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
421
422 @cached_property
423 def _relation_tree(self) -> list[Field]:
424 return self._populate_directed_relation_graph()
425
426 def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
427 # This method is usually called by packages.cache_clear(), when the
428 # registry is finalized, or when a new field is added.
429 if forward:
430 for cache_key in self.FORWARD_PROPERTIES:
431 if cache_key in self.__dict__:
432 delattr(self, cache_key)
433 if reverse:
434 for cache_key in self.REVERSE_PROPERTIES:
435 if cache_key in self.__dict__:
436 delattr(self, cache_key)
437 self._get_fields_cache = {}
438
439 @overload
440 def get_fields(
441 self, include_reverse: Literal[False] = False
442 ) -> ImmutableList[Field]: ...
443
444 @overload
445 def get_fields(
446 self, include_reverse: Literal[True]
447 ) -> ImmutableList[Field | ForeignObjectRel]: ...
448
449 def get_fields(
450 self, include_reverse: bool = False
451 ) -> ImmutableList[Field | ForeignObjectRel]:
452 """
453 Return a list of fields associated to the model.
454
455 By default, returns only forward fields (fields explicitly defined on
456 this model). Set include_reverse=True to also include reverse relations
457 (fields from other models that point to this model).
458
459 Args:
460 include_reverse: Include reverse relation fields (fields from other
461 models pointing to this model). Needed for framework
462 operations like migrations and deletion cascading.
463 """
464 return self._get_fields(reverse=include_reverse)
465
466 @overload
467 def _get_fields(
468 self,
469 *,
470 forward: Literal[True] = True,
471 reverse: Literal[False],
472 seen_models: set[type[Any]] | None = None,
473 ) -> ImmutableList[Field]: ...
474
475 @overload
476 def _get_fields(
477 self,
478 *,
479 forward: Literal[False],
480 reverse: Literal[True] = True,
481 seen_models: set[type[Any]] | None = None,
482 ) -> ImmutableList[ForeignObjectRel]: ...
483
484 @overload
485 def _get_fields(
486 self,
487 *,
488 forward: bool = True,
489 reverse: bool = True,
490 seen_models: set[type[Any]] | None = None,
491 ) -> ImmutableList[Field | ForeignObjectRel]: ...
492
493 def _get_fields(
494 self,
495 *,
496 forward: bool = True,
497 reverse: bool = True,
498 seen_models: set[type[Any]] | None = None,
499 ) -> ImmutableList[Field | ForeignObjectRel]:
500 """
501 Internal helper function to return fields of the model.
502
503 Args:
504 forward: If True, fields defined on this model are returned.
505 reverse: If True, reverse relations (fields from other models
506 pointing to this model) are returned.
507 seen_models: Track visited models to prevent duplicates in recursion.
508 """
509
510 # This helper function is used to allow recursion in ``get_fields()``
511 # implementation and to provide a fast way for Plain's internals to
512 # access specific subsets of fields.
513
514 # We must keep track of which models we have already seen. Otherwise we
515 # could include the same field multiple times from different models.
516 topmost_call = seen_models is None
517 if seen_models is None:
518 seen_models = set()
519 seen_models.add(self.model)
520
521 # Creates a cache key composed of all arguments
522 cache_key = (forward, reverse, topmost_call)
523
524 try:
525 # In order to avoid list manipulation. Always return a shallow copy
526 # of the results.
527 return self._get_fields_cache[cache_key]
528 except KeyError:
529 pass
530
531 fields = []
532
533 if reverse:
534 # Tree is computed once and cached until the app cache is expired.
535 # It is composed of a list of fields from other models pointing to
536 # the current model (reverse relations).
537 all_fields = self._relation_tree
538 for field in all_fields:
539 fields.append(field.remote_field)
540
541 if forward:
542 fields += self.local_fields
543 fields += self.local_many_to_many
544
545 # In order to avoid list manipulation. Always
546 # return a shallow copy of the results
547 fields = make_immutable_fields_list("get_fields()", fields)
548
549 # Store result into cache for later access
550 self._get_fields_cache[cache_key] = fields
551 return fields
552
553 @cached_property
554 def _property_names(self) -> frozenset[str]:
555 """Return a set of the names of the properties defined on the model."""
556 names = []
557 for name in dir(self.model):
558 attr = inspect.getattr_static(self.model, name)
559 if isinstance(attr, property):
560 names.append(name)
561 return frozenset(names)
562
563 @cached_property
564 def _non_pk_concrete_field_names(self) -> frozenset[str]:
565 """
566 Return a set of the non-primary key concrete field names defined on the model.
567 """
568 names = []
569 for field in self.concrete_fields:
570 if not field.primary_key:
571 names.append(field.name)
572 if field.name != field.attname:
573 names.append(field.attname)
574 return frozenset(names)
575
576 @cached_property
577 def db_returning_fields(self) -> list[Field]:
578 """
579 Private API intended only to be used by Plain itself.
580 Fields to be returned after a database insert.
581 """
582 return [
583 field
584 for field in self._get_fields(forward=True, reverse=False)
585 if getattr(field, "db_returning", False)
586 ]