1import bisect
2import inspect
3from collections import defaultdict
4from functools import cached_property
5
6from plain.exceptions import FieldDoesNotExist
7from plain.models import models_registry
8from plain.models.constraints import UniqueConstraint
9from plain.models.db import db_connection
10from plain.models.fields import PrimaryKeyField
11from plain.models.query import QuerySet
12from plain.utils.datastructures import ImmutableList
13
14PROXY_PARENTS = object()
15
16EMPTY_RELATION_TREE = ()
17
18IMMUTABLE_WARNING = (
19 "The return type of '%s' should never be mutated. If you want to manipulate this "
20 "list for your own use, make a copy first."
21)
22
23DEFAULT_NAMES = (
24 "db_table",
25 "db_table_comment",
26 "queryset_class",
27 "ordering",
28 "package_label",
29 "models_registry",
30 "required_db_features",
31 "required_db_vendor",
32 "indexes",
33 "constraints",
34)
35
36
37def make_immutable_fields_list(name, data):
38 return ImmutableList(data, warning=IMMUTABLE_WARNING % name)
39
40
41class Options:
42 FORWARD_PROPERTIES = {
43 "fields",
44 "many_to_many",
45 "concrete_fields",
46 "local_concrete_fields",
47 "_non_pk_concrete_field_names",
48 "_forward_fields_map",
49 "base_queryset",
50 "queryset",
51 }
52 REVERSE_PROPERTIES = {"related_objects", "fields_map", "_relation_tree"}
53
54 default_models_registry = models_registry
55
56 def __init__(self, meta, package_label=None):
57 self._get_fields_cache = {}
58 self.local_fields = []
59 self.local_many_to_many = []
60 self.queryset_class = None
61 self.model_name = None
62 self.db_table = ""
63 self.db_table_comment = ""
64 self.ordering = []
65 self.indexes = []
66 self.constraints = []
67 self.object_name = None
68 self.package_label = package_label
69 self.required_db_features = []
70 self.required_db_vendor = None
71 self.meta = meta
72
73 # For any non-abstract class, the concrete class is the model
74 # in the end of the proxy_for_model chain. In particular, for
75 # concrete models, the concrete_model is always the class itself.
76 self.concrete_model = None
77
78 # List of all lookups defined in ForeignKey 'limit_choices_to' options
79 # from *other* models. Needed for some admin checks. Internal use only.
80 self.related_fkey_lookups = []
81
82 # A custom app registry to use, if you're making a separate model set.
83 self.models_registry = self.default_models_registry
84
85 @property
86 def label(self):
87 return f"{self.package_label}.{self.object_name}"
88
89 @property
90 def label_lower(self):
91 return f"{self.package_label}.{self.model_name}"
92
93 def contribute_to_class(self, cls, name):
94 from plain.models.backends.utils import truncate_name
95
96 cls._meta = self
97 self.model = cls
98 # First, construct the default values for these options.
99 self.object_name = cls.__name__
100 self.model_name = self.object_name.lower()
101
102 # Store the original user-defined values for each option,
103 # for use when serializing the model definition
104 self.original_attrs = {}
105
106 # Next, apply any overridden values from 'class Meta'.
107 if self.meta:
108 meta_attrs = self.meta.__dict__.copy()
109 for name in self.meta.__dict__:
110 # Ignore any private attributes that Plain doesn't care about.
111 # NOTE: We can't modify a dictionary's contents while looping
112 # over it, so we loop over the *original* dictionary instead.
113 if name.startswith("_"):
114 del meta_attrs[name]
115 for attr_name in DEFAULT_NAMES:
116 if attr_name in meta_attrs:
117 setattr(self, attr_name, meta_attrs.pop(attr_name))
118 self.original_attrs[attr_name] = getattr(self, attr_name)
119 elif hasattr(self.meta, attr_name):
120 setattr(self, attr_name, getattr(self.meta, attr_name))
121 self.original_attrs[attr_name] = getattr(self, attr_name)
122
123 # Package label/class name interpolation for names of constraints and
124 # indexes.
125 for attr_name in {"constraints", "indexes"}:
126 objs = getattr(self, attr_name, [])
127 setattr(self, attr_name, self._format_names_with_class(cls, objs))
128
129 # Any leftover attributes must be invalid.
130 if meta_attrs != {}:
131 raise TypeError(
132 "'class Meta' got invalid attribute(s): {}".format(
133 ",".join(meta_attrs)
134 )
135 )
136
137 del self.meta
138
139 # If the db_table wasn't provided, use the package_label + model_name.
140 if not self.db_table:
141 self.db_table = f"{self.package_label}_{self.model_name}"
142 self.db_table = truncate_name(
143 self.db_table,
144 db_connection.ops.max_name_length(),
145 )
146
147 def _format_names_with_class(self, cls, objs):
148 """Package label/class name interpolation for object names."""
149 new_objs = []
150 for obj in objs:
151 obj = obj.clone()
152 obj.name = obj.name % {
153 "package_label": cls._meta.package_label.lower(),
154 "class": cls.__name__.lower(),
155 }
156 new_objs.append(obj)
157 return new_objs
158
159 def _prepare(self, model):
160 if not any(f.name == "id" for f in self.local_fields):
161 model.add_to_class("id", PrimaryKeyField())
162
163 def add_field(self, field, private=False):
164 # Insert the given field in the order in which it was created, using
165 # the "creation_counter" attribute of the field.
166 # Move many-to-many related fields from self.fields into
167 # self.many_to_many.
168 if field.is_relation and field.many_to_many:
169 bisect.insort(self.local_many_to_many, field)
170 else:
171 bisect.insort(self.local_fields, field)
172
173 # If the field being added is a relation to another known field,
174 # expire the cache on this field and the forward cache on the field
175 # being referenced, because there will be new relationships in the
176 # cache. Otherwise, expire the cache of references *to* this field.
177 # The mechanism for getting at the related model is slightly odd -
178 # ideally, we'd just ask for field.related_model. However, related_model
179 # is a cached property, and all the models haven't been loaded yet, so
180 # we need to make sure we don't cache a string reference.
181 if (
182 field.is_relation
183 and hasattr(field.remote_field, "model")
184 and field.remote_field.model
185 ):
186 try:
187 field.remote_field.model._meta._expire_cache(forward=False)
188 except AttributeError:
189 pass
190 self._expire_cache()
191 else:
192 self._expire_cache(reverse=False)
193
194 def __repr__(self):
195 return f"<Options for {self.object_name}>"
196
197 def __str__(self):
198 return self.label_lower
199
200 def can_migrate(self, connection):
201 """
202 Return True if the model can/should be migrated on the given
203 `connection` object.
204 """
205 if self.required_db_vendor:
206 return self.required_db_vendor == connection.vendor
207 if self.required_db_features:
208 return all(
209 getattr(connection.features, feat, False)
210 for feat in self.required_db_features
211 )
212 return True
213
214 @property
215 def base_queryset(self):
216 """
217 The base queryset is used by Plain's internal operations like cascading
218 deletes, migrations, and related object lookups. It provides access to
219 all objects in the database without any filtering, ensuring Plain can
220 always see the complete dataset when performing framework operations.
221
222 Unlike user-defined querysets which may filter results (e.g. only active
223 objects), the base queryset must never filter out rows to prevent
224 incomplete results in related queries.
225 """
226 return QuerySet(model=self.model)
227
228 @property
229 def queryset(self):
230 if self.queryset_class:
231 return self.queryset_class(model=self.model)
232 return QuerySet(model=self.model)
233
234 @cached_property
235 def fields(self):
236 """
237 Return a list of all forward fields on the model and its parents,
238 excluding ManyToManyFields.
239
240 Private API intended only to be used by Plain itself; get_fields()
241 combined with filtering of field properties is the public API for
242 obtaining this field list.
243 """
244
245 # For legacy reasons, the fields property should only contain forward
246 # fields that are not private or with a m2m cardinality. Therefore we
247 # pass these three filters as filters to the generator.
248 # The third lambda is a longwinded way of checking f.related_model - we don't
249 # use that property directly because related_model is a cached property,
250 # and all the models may not have been loaded yet; we don't want to cache
251 # the string reference to the related_model.
252 def is_not_an_m2m_field(f):
253 return not (f.is_relation and f.many_to_many)
254
255 def is_not_a_generic_relation(f):
256 return not (f.is_relation and f.one_to_many)
257
258 def is_not_a_generic_foreign_key(f):
259 return not (
260 f.is_relation
261 and f.many_to_one
262 and not (hasattr(f.remote_field, "model") and f.remote_field.model)
263 )
264
265 return make_immutable_fields_list(
266 "fields",
267 (
268 f
269 for f in self._get_fields(reverse=False)
270 if is_not_an_m2m_field(f)
271 and is_not_a_generic_relation(f)
272 and is_not_a_generic_foreign_key(f)
273 ),
274 )
275
276 @cached_property
277 def concrete_fields(self):
278 """
279 Return a list of all concrete fields on the model and its parents.
280
281 Private API intended only to be used by Plain itself; get_fields()
282 combined with filtering of field properties is the public API for
283 obtaining this field list.
284 """
285 return make_immutable_fields_list(
286 "concrete_fields", (f for f in self.fields if f.concrete)
287 )
288
289 @cached_property
290 def local_concrete_fields(self):
291 """
292 Return a list of all concrete fields on the model.
293
294 Private API intended only to be used by Plain itself; get_fields()
295 combined with filtering of field properties is the public API for
296 obtaining this field list.
297 """
298 return make_immutable_fields_list(
299 "local_concrete_fields", (f for f in self.local_fields if f.concrete)
300 )
301
302 @cached_property
303 def many_to_many(self):
304 """
305 Return a list of all many to many fields on the model and its parents.
306
307 Private API intended only to be used by Plain itself; get_fields()
308 combined with filtering of field properties is the public API for
309 obtaining this list.
310 """
311 return make_immutable_fields_list(
312 "many_to_many",
313 (
314 f
315 for f in self._get_fields(reverse=False)
316 if f.is_relation and f.many_to_many
317 ),
318 )
319
320 @cached_property
321 def related_objects(self):
322 """
323 Return all related objects pointing to the current model. The related
324 objects can come from a one-to-one, one-to-many, or many-to-many field
325 relation type.
326
327 Private API intended only to be used by Plain itself; get_fields()
328 combined with filtering of field properties is the public API for
329 obtaining this field list.
330 """
331 all_related_fields = self._get_fields(
332 forward=False, reverse=True, include_hidden=True
333 )
334 return make_immutable_fields_list(
335 "related_objects",
336 (
337 obj
338 for obj in all_related_fields
339 if not obj.hidden or obj.field.many_to_many
340 ),
341 )
342
343 @cached_property
344 def _forward_fields_map(self):
345 res = {}
346 fields = self._get_fields(reverse=False)
347 for field in fields:
348 res[field.name] = field
349 # Due to the way Plain's internals work, get_field() should also
350 # be able to fetch a field by attname. In the case of a concrete
351 # field with relation, includes the *_id name too
352 try:
353 res[field.attname] = field
354 except AttributeError:
355 pass
356 return res
357
358 @cached_property
359 def fields_map(self):
360 res = {}
361 fields = self._get_fields(forward=False, include_hidden=True)
362 for field in fields:
363 res[field.name] = field
364 # Due to the way Plain's internals work, get_field() should also
365 # be able to fetch a field by attname. In the case of a concrete
366 # field with relation, includes the *_id name too
367 try:
368 res[field.attname] = field
369 except AttributeError:
370 pass
371 return res
372
373 def get_field(self, field_name):
374 """
375 Return a field instance given the name of a forward or reverse field.
376 """
377 try:
378 # In order to avoid premature loading of the relation tree
379 # (expensive) we prefer checking if the field is a forward field.
380 return self._forward_fields_map[field_name]
381 except KeyError:
382 # If the app registry is not ready, reverse fields are
383 # unavailable, therefore we throw a FieldDoesNotExist exception.
384 if not self.models_registry.ready:
385 raise FieldDoesNotExist(
386 f"{self.object_name} has no field named '{field_name}'. The app cache isn't ready yet, "
387 "so if this is an auto-created related field, it won't "
388 "be available yet."
389 )
390
391 try:
392 # Retrieve field instance by name from cached or just-computed
393 # field map.
394 return self.fields_map[field_name]
395 except KeyError:
396 raise FieldDoesNotExist(
397 f"{self.object_name} has no field named '{field_name}'"
398 )
399
400 def _populate_directed_relation_graph(self):
401 """
402 This method is used by each model to find its reverse objects. As this
403 method is very expensive and is accessed frequently (it looks up every
404 field in a model, in every app), it is computed on first access and then
405 is set as a property on every model.
406 """
407 related_objects_graph = defaultdict(list)
408
409 all_models = self.models_registry.get_models()
410 for model in all_models:
411 opts = model._meta
412
413 fields_with_relations = (
414 f
415 for f in opts._get_fields(reverse=False)
416 if f.is_relation and f.related_model is not None
417 )
418 for f in fields_with_relations:
419 if not isinstance(f.remote_field.model, str):
420 remote_label = f.remote_field.model._meta.concrete_model._meta.label
421 related_objects_graph[remote_label].append(f)
422
423 for model in all_models:
424 # Set the relation_tree using the internal __dict__. In this way
425 # we avoid calling the cached property. In attribute lookup,
426 # __dict__ takes precedence over a data descriptor (such as
427 # @cached_property). This means that the _meta._relation_tree is
428 # only called if related_objects is not in __dict__.
429 related_objects = related_objects_graph[
430 model._meta.concrete_model._meta.label
431 ]
432 model._meta.__dict__["_relation_tree"] = related_objects
433 # It seems it is possible that self is not in all_models, so guard
434 # against that with default for get().
435 return self.__dict__.get("_relation_tree", EMPTY_RELATION_TREE)
436
437 @cached_property
438 def _relation_tree(self):
439 return self._populate_directed_relation_graph()
440
441 def _expire_cache(self, forward=True, reverse=True):
442 # This method is usually called by packages.cache_clear(), when the
443 # registry is finalized, or when a new field is added.
444 if forward:
445 for cache_key in self.FORWARD_PROPERTIES:
446 if cache_key in self.__dict__:
447 delattr(self, cache_key)
448 if reverse:
449 for cache_key in self.REVERSE_PROPERTIES:
450 if cache_key in self.__dict__:
451 delattr(self, cache_key)
452 self._get_fields_cache = {}
453
454 def get_fields(self, include_hidden=False):
455 """
456 Return a list of fields associated to the model. By default, include
457 forward and reverse fields, fields derived from inheritance, but not
458 hidden fields. The returned fields can be changed using the parameters:
459
460 - include_hidden: include fields that have a related_name that
461 starts with a "+"
462 """
463 return self._get_fields(include_hidden=include_hidden)
464
465 def _get_fields(
466 self,
467 forward=True,
468 reverse=True,
469 include_hidden=False,
470 seen_models=None,
471 ):
472 """
473 Internal helper function to return fields of the model.
474 * If forward=True, then fields defined on this model are returned.
475 * If reverse=True, then relations pointing to this model are returned.
476 * If include_hidden=True, then fields with is_hidden=True are returned.
477 """
478
479 # This helper function is used to allow recursion in ``get_fields()``
480 # implementation and to provide a fast way for Plain's internals to
481 # access specific subsets of fields.
482
483 # We must keep track of which models we have already seen. Otherwise we
484 # could include the same field multiple times from different models.
485 topmost_call = seen_models is None
486 if topmost_call:
487 seen_models = set()
488 seen_models.add(self.model)
489
490 # Creates a cache key composed of all arguments
491 cache_key = (forward, reverse, include_hidden, topmost_call)
492
493 try:
494 # In order to avoid list manipulation. Always return a shallow copy
495 # of the results.
496 return self._get_fields_cache[cache_key]
497 except KeyError:
498 pass
499
500 fields = []
501
502 if reverse:
503 # Tree is computed once and cached until the app cache is expired.
504 # It is composed of a list of fields pointing to the current model
505 # from other models.
506 all_fields = self._relation_tree
507 for field in all_fields:
508 # If hidden fields should be included or the relation is not
509 # intentionally hidden, add to the fields dict.
510 if include_hidden or not field.remote_field.hidden:
511 fields.append(field.remote_field)
512
513 if forward:
514 fields += self.local_fields
515 fields += self.local_many_to_many
516
517 # In order to avoid list manipulation. Always
518 # return a shallow copy of the results
519 fields = make_immutable_fields_list("get_fields()", fields)
520
521 # Store result into cache for later access
522 self._get_fields_cache[cache_key] = fields
523 return fields
524
525 @cached_property
526 def total_unique_constraints(self):
527 """
528 Return a list of total unique constraints. Useful for determining set
529 of fields guaranteed to be unique for all rows.
530 """
531 return [
532 constraint
533 for constraint in self.constraints
534 if (
535 isinstance(constraint, UniqueConstraint)
536 and constraint.condition is None
537 and not constraint.contains_expressions
538 )
539 ]
540
541 @cached_property
542 def _property_names(self):
543 """Return a set of the names of the properties defined on the model."""
544 names = []
545 for name in dir(self.model):
546 attr = inspect.getattr_static(self.model, name)
547 if isinstance(attr, property):
548 names.append(name)
549 return frozenset(names)
550
551 @cached_property
552 def _non_pk_concrete_field_names(self):
553 """
554 Return a set of the non-primary key concrete field names defined on the model.
555 """
556 names = []
557 for field in self.concrete_fields:
558 if not field.primary_key:
559 names.append(field.name)
560 if field.name != field.attname:
561 names.append(field.attname)
562 return frozenset(names)
563
564 @cached_property
565 def db_returning_fields(self):
566 """
567 Private API intended only to be used by Plain itself.
568 Fields to be returned after a database insert.
569 """
570 return [
571 field
572 for field in self._get_fields(forward=True, reverse=False)
573 if getattr(field, "db_returning", False)
574 ]