1from __future__ import annotations
2
3import functools
4import warnings
5from collections import defaultdict
6from collections.abc import Callable
7from functools import partial
8from typing import TYPE_CHECKING, TypeVar
9
10if TYPE_CHECKING:
11 from plain.models.base import Model
12
13# Model classes are registered, not instances.
14M = TypeVar("M", bound="Model")
15
16
17class ModelsRegistryNotReady(Exception):
18 """The plain.models registry is not populated yet"""
19
20 pass
21
22
23class ModelsRegistry:
24 def __init__(self) -> None:
25 # Mapping of app labels => model names => model classes. Every time a
26 # model is imported, ModelBase.__new__ calls packages.register_model which
27 # creates an entry in all_models. All imported models are registered,
28 # regardless of whether they're defined in an installed application
29 # and whether the registry has been populated. Since it isn't possible
30 # to reimport a module safely (it could reexecute initialization code)
31 # all_models is never overridden or reset.
32 self.all_models: defaultdict[str, dict[str, type[Model]]] = defaultdict(dict)
33
34 # Maps ("package_label", "modelname") tuples to lists of functions to be
35 # called when the corresponding model is ready. Used by this class's
36 # `lazy_model_operation()` and `do_pending_operations()` methods.
37 self._pending_operations: defaultdict[
38 tuple[str, str], list[Callable[[type[Model]], None]]
39 ] = defaultdict(list)
40
41 self.ready: bool = False
42
43 def check_ready(self) -> None:
44 """Raise an exception if all models haven't been imported yet."""
45 if not self.ready:
46 raise ModelsRegistryNotReady("Models aren't loaded yet.")
47
48 # This method is performance-critical at least for Plain's test suite.
49 @functools.cache
50 def get_models(self, *, package_label: str = "") -> list[type[Model]]:
51 """
52 Return a list of all installed models.
53
54 By default, the following models aren't included:
55
56 - auto-created models for many-to-many relations without
57 an explicit intermediate table,
58
59 Set the corresponding keyword argument to True to include such models.
60 """
61
62 self.check_ready()
63
64 models = []
65
66 # Get models for a single package
67 if package_label:
68 package_models = self.all_models[package_label]
69 for model in package_models.values():
70 models.append(model)
71 return models
72
73 # Get models for all packages
74 for package_models in self.all_models.values():
75 for model in package_models.values():
76 models.append(model)
77
78 return models
79
80 def get_model(
81 self,
82 package_label: str,
83 model_name: str | None = None,
84 require_ready: bool = True,
85 ) -> type[Model]:
86 """
87 Return the model matching the given package_label and model_name.
88
89 As a shortcut, package_label may be in the form <package_label>.<model_name>.
90
91 model_name is case-insensitive.
92
93 Raise LookupError if no application exists with this label, or no
94 model exists with this name in the application. Raise ValueError if
95 called with a single argument that doesn't contain exactly one dot.
96 """
97
98 if require_ready:
99 self.check_ready()
100
101 if model_name is None:
102 package_label, model_name = package_label.split(".")
103
104 package_models = self.all_models[package_label]
105 return package_models[model_name.lower()]
106
107 def register_model(self, package_label: str, model: type[Model]) -> None:
108 # Since this method is called when models are imported, it cannot
109 # perform imports because of the risk of import loops. It mustn't
110 # call get_package_config().
111 model_name = model.model_options.model_name
112 app_models = self.all_models[package_label]
113 if model_name in app_models:
114 if (
115 model.__name__ == app_models[model_name].__name__
116 and model.__module__ == app_models[model_name].__module__
117 ):
118 warnings.warn(
119 f"Model '{package_label}.{model_name}' was already registered. Reloading models is not "
120 "advised as it can lead to inconsistencies, most notably with "
121 "related models.",
122 RuntimeWarning,
123 stacklevel=2,
124 )
125 else:
126 raise RuntimeError(
127 f"Conflicting '{model_name}' models in application '{package_label}': {app_models[model_name]} and {model}."
128 )
129 app_models[model_name] = model
130 self.do_pending_operations(model)
131 self.clear_cache()
132
133 def _get_registered_model(self, package_label: str, model_name: str) -> type[Model]:
134 """
135 Similar to get_model(), but doesn't require that an app exists with
136 the given package_label.
137
138 It's safe to call this method at import time, even while the registry
139 is being populated.
140 """
141 model = self.all_models[package_label].get(model_name.lower())
142 if model is None:
143 raise LookupError(f"Model '{package_label}.{model_name}' not registered.")
144 return model
145
146 def clear_cache(self) -> None:
147 """
148 Clear all internal caches, for methods that alter the app registry.
149
150 This is mostly used in tests.
151 """
152 # Call expire cache on each model. This will purge
153 # the relation tree and the fields cache.
154 self.get_models.cache_clear()
155 if self.ready:
156 # Circumvent self.get_models() to prevent that the cache is refilled.
157 # This particularly prevents that an empty value is cached while cloning.
158 for package_models in self.all_models.values():
159 for model in package_models.values():
160 model._model_meta._expire_cache()
161
162 def lazy_model_operation(
163 self, function: Callable[..., None], *model_keys: tuple[str, str]
164 ) -> None:
165 """
166 Take a function and a number of ("package_label", "modelname") tuples, and
167 when all the corresponding models have been imported and registered,
168 call the function with the model classes as its arguments.
169
170 The function passed to this method must accept exactly n models as
171 arguments, where n=len(model_keys).
172 """
173 # Base case: no arguments, just execute the function.
174 if not model_keys:
175 function()
176 # Recursive case: take the head of model_keys, wait for the
177 # corresponding model class to be imported and registered, then apply
178 # that argument to the supplied function. Pass the resulting partial
179 # to lazy_model_operation() along with the remaining model args and
180 # repeat until all models are loaded and all arguments are applied.
181 else:
182 next_model, *more_models = model_keys
183
184 # This will be executed after the class corresponding to next_model
185 # has been imported and registered.
186 def apply_next_model(model: type[Model]) -> None:
187 next_function = partial(function, model)
188 self.lazy_model_operation(next_function, *more_models)
189
190 # If the model has already been imported and registered, partially
191 # apply it to the function now. If not, add it to the list of
192 # pending operations for the model, where it will be executed with
193 # the model class as its sole argument once the model is ready.
194 try:
195 model_class = self._get_registered_model(*next_model)
196 except LookupError:
197 self._pending_operations[next_model].append(apply_next_model)
198 else:
199 apply_next_model(model_class)
200
201 def do_pending_operations(self, model: type[Model]) -> None:
202 """
203 Take a newly-prepared model and pass it to each function waiting for
204 it. This is called at the very end of Models.register_model().
205 """
206 key = model.model_options.package_label, model.model_options.model_name
207 for function in self._pending_operations.pop(key, []):
208 function(model)
209
210
211models_registry = ModelsRegistry()
212
213
214# Decorator to register a model (using the internal registry for the correct state).
215def register_model(model_class: type[M]) -> type[M]:
216 model_class._model_meta.models_registry.register_model(
217 model_class.model_options.package_label,
218 model_class,
219 )
220 return model_class