1from __future__ import annotations
2
3import bisect
4import copy
5import inspect
6from collections import defaultdict
7from functools import cached_property
8from typing import TYPE_CHECKING, Any
9
10from plain.models.exceptions import FieldDoesNotExist
11from plain.models.query import QuerySet
12from plain.models.registry import models_registry as default_models_registry
13from plain.utils.datastructures import ImmutableList
14
15if TYPE_CHECKING:
16 from plain.models.base import Model
17 from plain.models.fields import Field
18
19EMPTY_RELATION_TREE = ()
20
21IMMUTABLE_WARNING = (
22 "The return type of '%s' should never be mutated. If you want to manipulate this "
23 "list for your own use, make a copy first."
24)
25
26
27def make_immutable_fields_list(name: str, data: Any) -> ImmutableList:
28 return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
29
30
31class Meta:
32 """
33 Model metadata descriptor and container.
34
35 Acts as both a descriptor (for lazy initialization and access control)
36 and the actual metadata instance (cached per model class).
37 """
38
39 FORWARD_PROPERTIES = {
40 "fields",
41 "many_to_many",
42 "concrete_fields",
43 "local_concrete_fields",
44 "_non_pk_concrete_field_names",
45 "_forward_fields_map",
46 "base_queryset",
47 }
48 REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
49
50 # Type annotations for attributes set in _create_and_cache
51 # These exist on cached instances, not on the descriptor itself
52 model: type[Model]
53 models_registry: Any
54 _get_fields_cache: dict[Any, Any]
55 local_fields: list[Field]
56 local_many_to_many: list[Field]
57
58 def __init__(self, models_registry: Any | None = None):
59 """
60 Initialize the descriptor with optional configuration.
61
62 This is called ONCE when defining the base Model class.
63 The descriptor then creates cached instances per model subclass.
64 """
65 self._models_registry = models_registry
66 self._cache: dict[type[Model], Meta] = {}
67
68 def __get__(self, instance: Any, owner: type[Model]) -> Meta:
69 """
70 Descriptor protocol - returns cached Meta instance for the model class.
71
72 This is called when accessing Model._model_meta and returns a per-class
73 cached instance created by _create_and_cache().
74
75 Can be accessed from both class and instances:
76 - MyModel._model_meta (class access)
77 - my_instance._model_meta (instance access - returns class's metadata)
78 """
79 # Allow instance access - just return the class's metadata
80 if instance is not None:
81 owner = instance.__class__
82
83 # Skip for the base Model class - return descriptor
84 if owner.__name__ == "Model" and owner.__module__ == "plain.models.base":
85 return self # type: ignore
86
87 # Return cached instance or create new one
88 if owner not in self._cache:
89 # Create the instance and cache it BEFORE field contribution
90 # to avoid infinite recursion when fields access cls._model_meta
91 return self._create_and_cache(owner)
92
93 return self._cache[owner]
94
95 def _create_and_cache(self, model: type[Model]) -> Meta:
96 """Create Meta instance and cache it before field contribution."""
97 # Create instance without calling __init__
98 instance = Meta.__new__(Meta)
99
100 # Initialize basic model-specific state
101 instance.model = model
102 instance.models_registry = self._models_registry or default_models_registry
103 instance._get_fields_cache = {}
104 instance.local_fields = []
105 instance.local_many_to_many = []
106
107 # Cache the instance BEFORE processing fields to prevent recursion
108 self._cache[model] = instance
109
110 # Now process fields - they can safely access cls._model_meta
111 seen_attrs = set()
112 for klass in model.__mro__:
113 for attr_name in list(klass.__dict__.keys()):
114 if attr_name.startswith("_") or attr_name in seen_attrs:
115 continue
116 seen_attrs.add(attr_name)
117
118 attr_value = klass.__dict__[attr_name]
119
120 if not inspect.isclass(attr_value) and hasattr(
121 attr_value, "contribute_to_class"
122 ):
123 if attr_name not in model.__dict__:
124 field = copy.deepcopy(attr_value)
125 else:
126 field = attr_value
127 field.contribute_to_class(model, attr_name)
128
129 # Set index names now that fields are contributed
130 # Trigger model_options descriptor to ensure it's initialized
131 # (accessing it will cache the instance)
132 for index in model.model_options.indexes:
133 if not index.name:
134 index.set_name_with_model(model)
135
136 return instance
137
138 @property
139 def base_queryset(self) -> QuerySet:
140 """
141 The base queryset is used by Plain's internal operations like cascading
142 deletes, migrations, and related object lookups. It provides access to
143 all objects in the database without any filtering, ensuring Plain can
144 always see the complete dataset when performing framework operations.
145
146 Unlike user-defined querysets which may filter results (e.g. only active
147 objects), the base queryset must never filter out rows to prevent
148 incomplete results in related queries.
149 """
150 return QuerySet.from_model(self.model)
151
152 def add_field(self, field: Field) -> None:
153 # Insert the given field in the order in which it was created, using
154 # the "creation_counter" attribute of the field.
155 # Move many-to-many related fields from self.fields into
156 # self.many_to_many.
157 if field.is_relation and field.many_to_many:
158 bisect.insort(self.local_many_to_many, field)
159 else:
160 bisect.insort(self.local_fields, field)
161
162 # If the field being added is a relation to another known field,
163 # expire the cache on this field and the forward cache on the field
164 # being referenced, because there will be new relationships in the
165 # cache. Otherwise, expire the cache of references *to* this field.
166 # The mechanism for getting at the related model is slightly odd -
167 # ideally, we'd just ask for field.related_model. However, related_model
168 # is a cached property, and all the models haven't been loaded yet, so
169 # we need to make sure we don't cache a string reference.
170 if (
171 field.is_relation
172 and hasattr(field.remote_field, "model")
173 and field.remote_field.model
174 ):
175 try:
176 field.remote_field.model._model_meta._expire_cache(forward=False)
177 except AttributeError:
178 pass
179 self._expire_cache()
180 else:
181 self._expire_cache(reverse=False)
182
183 @cached_property
184 def fields(self) -> ImmutableList:
185 """
186 Return a list of all forward fields on the model and its parents,
187 excluding ManyToManyFields.
188
189 Private API intended only to be used by Plain itself; get_fields()
190 combined with filtering of field properties is the public API for
191 obtaining this field list.
192 """
193
194 # For legacy reasons, the fields property should only contain forward
195 # fields that are not private or with a m2m cardinality. Therefore we
196 # pass these three filters as filters to the generator.
197 # The third lambda is a longwinded way of checking f.related_model - we don't
198 # use that property directly because related_model is a cached property,
199 # and all the models may not have been loaded yet; we don't want to cache
200 # the string reference to the related_model.
201 def is_not_an_m2m_field(f: Any) -> bool:
202 return not (f.is_relation and f.many_to_many)
203
204 def is_not_a_generic_relation(f: Any) -> bool:
205 return not (f.is_relation and f.one_to_many)
206
207 def is_not_a_generic_foreign_key(f: Any) -> bool:
208 return not (
209 f.is_relation
210 and f.many_to_one
211 and not (hasattr(f.remote_field, "model") and f.remote_field.model)
212 )
213
214 return make_immutable_fields_list(
215 "fields",
216 (
217 f
218 for f in self._get_fields(reverse=False)
219 if is_not_an_m2m_field(f)
220 and is_not_a_generic_relation(f)
221 and is_not_a_generic_foreign_key(f)
222 ),
223 )
224
225 @cached_property
226 def concrete_fields(self) -> ImmutableList:
227 """
228 Return a list of all concrete fields on the model and its parents.
229
230 Private API intended only to be used by Plain itself; get_fields()
231 combined with filtering of field properties is the public API for
232 obtaining this field list.
233 """
234 return make_immutable_fields_list(
235 "concrete_fields", (f for f in self.fields if f.concrete)
236 )
237
238 @cached_property
239 def local_concrete_fields(self) -> ImmutableList:
240 """
241 Return a list of all concrete fields on the model.
242
243 Private API intended only to be used by Plain itself; get_fields()
244 combined with filtering of field properties is the public API for
245 obtaining this field list.
246 """
247 return make_immutable_fields_list(
248 "local_concrete_fields", (f for f in self.local_fields if f.concrete)
249 )
250
251 @cached_property
252 def many_to_many(self) -> ImmutableList:
253 """
254 Return a list of all many to many fields on the model and its parents.
255
256 Private API intended only to be used by Plain itself; get_fields()
257 combined with filtering of field properties is the public API for
258 obtaining this list.
259 """
260 return make_immutable_fields_list(
261 "many_to_many",
262 (
263 f
264 for f in self._get_fields(reverse=False)
265 if f.is_relation and f.many_to_many
266 ),
267 )
268
269 @cached_property
270 def related_objects(self) -> ImmutableList:
271 """
272 Return all related objects pointing to the current model. The related
273 objects can come from a one-to-one, one-to-many, or many-to-many field
274 relation type.
275
276 Private API intended only to be used by Plain itself; get_fields()
277 combined with filtering of field properties is the public API for
278 obtaining this field list.
279 """
280 all_related_fields = self._get_fields(forward=False, reverse=True)
281 return make_immutable_fields_list(
282 "related_objects",
283 (obj for obj in all_related_fields if obj.many_to_many or obj.one_to_many),
284 )
285
286 @cached_property
287 def _forward_fields_map(self) -> dict[str, Any]:
288 res = {}
289 fields = self._get_fields(reverse=False)
290 for field in fields:
291 res[field.name] = field
292 # Due to the way Plain's internals work, get_field() should also
293 # be able to fetch a field by attname. In the case of a concrete
294 # field with relation, includes the *_id name too
295 try:
296 res[field.attname] = field
297 except AttributeError:
298 pass
299 return res
300
301 @cached_property
302 def fields_map(self) -> dict[str, Any]:
303 res = {}
304 fields = self._get_fields(forward=False, reverse=True)
305 for field in fields:
306 res[field.name] = field
307 # Due to the way Plain's internals work, get_field() should also
308 # be able to fetch a field by attname. In the case of a concrete
309 # field with relation, includes the *_id name too
310 try:
311 res[field.attname] = field
312 except AttributeError:
313 pass
314 return res
315
316 def get_field(self, field_name: str) -> Any:
317 """
318 Return a field instance given the name of a forward or reverse field.
319 """
320 try:
321 # In order to avoid premature loading of the relation tree
322 # (expensive) we prefer checking if the field is a forward field.
323 return self._forward_fields_map[field_name]
324 except KeyError:
325 # If the app registry is not ready, reverse fields are
326 # unavailable, therefore we throw a FieldDoesNotExist exception.
327 if not self.models_registry.ready:
328 raise FieldDoesNotExist(
329 f"{self.model} has no field named '{field_name}'. The app cache isn't ready yet, "
330 "so if this is an auto-created related field, it won't "
331 "be available yet."
332 )
333
334 try:
335 # Retrieve field instance by name from cached or just-computed
336 # field map.
337 return self.fields_map[field_name]
338 except KeyError:
339 raise FieldDoesNotExist(f"{self.model} has no field named '{field_name}'")
340
341 def _populate_directed_relation_graph(self) -> Any:
342 """
343 This method is used by each model to find its reverse objects. As this
344 method is very expensive and is accessed frequently (it looks up every
345 field in a model, in every app), it is computed on first access and then
346 is set as a property on every model.
347 """
348 related_objects_graph: defaultdict[str, list[Any]] = defaultdict(list)
349
350 all_models = self.models_registry.get_models()
351 for model in all_models:
352 meta = model._model_meta
353
354 fields_with_relations = (
355 f
356 for f in meta._get_fields(reverse=False)
357 if f.is_relation and f.related_model is not None
358 )
359 for f in fields_with_relations:
360 if not isinstance(f.remote_field.model, str):
361 remote_label = f.remote_field.model.model_options.label
362 related_objects_graph[remote_label].append(f)
363
364 for model in all_models:
365 # Set the relation_tree using the internal __dict__. In this way
366 # we avoid calling the cached property. In attribute lookup,
367 # __dict__ takes precedence over a data descriptor (such as
368 # @cached_property). This means that the _model_meta._relation_tree is
369 # only called if related_objects is not in __dict__.
370 related_objects = related_objects_graph[model.model_options.label]
371 model._model_meta.__dict__["_relation_tree"] = related_objects
372 # It seems it is possible that self is not in all_models, so guard
373 # against that with default for get().
374 return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
375
376 @cached_property
377 def _relation_tree(self) -> Any:
378 return self._populate_directed_relation_graph()
379
380 def _expire_cache(self, forward: bool = True, reverse: bool = True) -> None:
381 # This method is usually called by packages.cache_clear(), when the
382 # registry is finalized, or when a new field is added.
383 if forward:
384 for cache_key in self.FORWARD_PROPERTIES:
385 if cache_key in self.__dict__:
386 delattr(self, cache_key)
387 if reverse:
388 for cache_key in self.REVERSE_PROPERTIES:
389 if cache_key in self.__dict__:
390 delattr(self, cache_key)
391 self._get_fields_cache = {}
392
393 def get_fields(self, include_reverse: bool = False) -> ImmutableList:
394 """
395 Return a list of fields associated to the model.
396
397 By default, returns only forward fields (fields explicitly defined on
398 this model). Set include_reverse=True to also include reverse relations
399 (fields from other models that point to this model).
400
401 Args:
402 include_reverse: Include reverse relation fields (fields from other
403 models pointing to this model). Needed for framework
404 operations like migrations and deletion cascading.
405 """
406 return self._get_fields(reverse=include_reverse)
407
408 def _get_fields(
409 self,
410 *,
411 forward: bool = True,
412 reverse: bool = True,
413 seen_models: set[type[Any]] | None = None,
414 ) -> ImmutableList:
415 """
416 Internal helper function to return fields of the model.
417
418 Args:
419 forward: If True, fields defined on this model are returned.
420 reverse: If True, reverse relations (fields from other models
421 pointing to this model) are returned.
422 seen_models: Track visited models to prevent duplicates in recursion.
423 """
424
425 # This helper function is used to allow recursion in ``get_fields()``
426 # implementation and to provide a fast way for Plain's internals to
427 # access specific subsets of fields.
428
429 # We must keep track of which models we have already seen. Otherwise we
430 # could include the same field multiple times from different models.
431 topmost_call = seen_models is None
432 if seen_models is None:
433 seen_models = set()
434 seen_models.add(self.model)
435
436 # Creates a cache key composed of all arguments
437 cache_key = (forward, reverse, topmost_call)
438
439 try:
440 # In order to avoid list manipulation. Always return a shallow copy
441 # of the results.
442 return self._get_fields_cache[cache_key]
443 except KeyError:
444 pass
445
446 fields = []
447
448 if reverse:
449 # Tree is computed once and cached until the app cache is expired.
450 # It is composed of a list of fields from other models pointing to
451 # the current model (reverse relations).
452 all_fields = self._relation_tree
453 for field in all_fields:
454 fields.append(field.remote_field)
455
456 if forward:
457 fields += self.local_fields
458 fields += self.local_many_to_many
459
460 # In order to avoid list manipulation. Always
461 # return a shallow copy of the results
462 fields = make_immutable_fields_list("get_fields()", fields)
463
464 # Store result into cache for later access
465 self._get_fields_cache[cache_key] = fields
466 return fields
467
468 @cached_property
469 def _property_names(self) -> frozenset[str]:
470 """Return a set of the names of the properties defined on the model."""
471 names = []
472 for name in dir(self.model):
473 attr = inspect.getattr_static(self.model, name)
474 if isinstance(attr, property):
475 names.append(name)
476 return frozenset(names)
477
478 @cached_property
479 def _non_pk_concrete_field_names(self) -> frozenset[str]:
480 """
481 Return a set of the non-primary key concrete field names defined on the model.
482 """
483 names = []
484 for field in self.concrete_fields:
485 if not field.primary_key:
486 names.append(field.name)
487 if field.name != field.attname:
488 names.append(field.attname)
489 return frozenset(names)
490
491 @cached_property
492 def db_returning_fields(self) -> list[Field]:
493 """
494 Private API intended only to be used by Plain itself.
495 Fields to be returned after a database insert.
496 """
497 return [
498 field
499 for field in self._get_fields(forward=True, reverse=False)
500 if getattr(field, "db_returning", False)
501 ]