1from __future__ import annotations
2
3import functools
4import warnings
5from collections import defaultdict
6from collections.abc import Callable
7from functools import partial
8from typing import TYPE_CHECKING, TypeVar
9
10if TYPE_CHECKING:
11 from plain.models.base import Model
12
13M = TypeVar("M", bound="Model")
14
15
16class ModelsRegistryNotReady(Exception):
17 """The plain.models registry is not populated yet"""
18
19 pass
20
21
22class ModelsRegistry:
23 def __init__(self) -> None:
24 # Mapping of app labels => model names => model classes. Every time a
25 # model is imported, ModelBase.__new__ calls packages.register_model which
26 # creates an entry in all_models. All imported models are registered,
27 # regardless of whether they're defined in an installed application
28 # and whether the registry has been populated. Since it isn't possible
29 # to reimport a module safely (it could reexecute initialization code)
30 # all_models is never overridden or reset.
31 self.all_models: defaultdict[str, dict[str, type[Model]]] = defaultdict(dict)
32
33 # Maps ("package_label", "modelname") tuples to lists of functions to be
34 # called when the corresponding model is ready. Used by this class's
35 # `lazy_model_operation()` and `do_pending_operations()` methods.
36 self._pending_operations: defaultdict[
37 tuple[str, str], list[Callable[[type[Model]], None]]
38 ] = defaultdict(list)
39
40 self.ready: bool = False
41
42 def check_ready(self) -> None:
43 """Raise an exception if all models haven't been imported yet."""
44 if not self.ready:
45 raise ModelsRegistryNotReady("Models aren't loaded yet.")
46
47 # This method is performance-critical at least for Plain's test suite.
48 @functools.cache
49 def get_models(self, *, package_label: str = "") -> list[type[Model]]:
50 """
51 Return a list of all installed models.
52
53 By default, the following models aren't included:
54
55 - auto-created models for many-to-many relations without
56 an explicit intermediate table,
57
58 Set the corresponding keyword argument to True to include such models.
59 """
60
61 self.check_ready()
62
63 models = []
64
65 # Get models for a single package
66 if package_label:
67 package_models = self.all_models[package_label]
68 for model in package_models.values():
69 models.append(model)
70 return models
71
72 # Get models for all packages
73 for package_models in self.all_models.values():
74 for model in package_models.values():
75 models.append(model)
76
77 return models
78
79 def get_model(
80 self,
81 package_label: str,
82 model_name: str | None = None,
83 require_ready: bool = True,
84 ) -> type[Model]:
85 """
86 Return the model matching the given package_label and model_name.
87
88 As a shortcut, package_label may be in the form <package_label>.<model_name>.
89
90 model_name is case-insensitive.
91
92 Raise LookupError if no application exists with this label, or no
93 model exists with this name in the application. Raise ValueError if
94 called with a single argument that doesn't contain exactly one dot.
95 """
96
97 if require_ready:
98 self.check_ready()
99
100 if model_name is None:
101 package_label, model_name = package_label.split(".")
102
103 package_models = self.all_models[package_label]
104 return package_models[model_name.lower()]
105
106 def register_model(self, package_label: str, model: type[Model]) -> None:
107 # Since this method is called when models are imported, it cannot
108 # perform imports because of the risk of import loops. It mustn't
109 # call get_package_config().
110 model_name = model.model_options.model_name
111 app_models = self.all_models[package_label]
112 if model_name in app_models:
113 if (
114 model.__name__ == app_models[model_name].__name__
115 and model.__module__ == app_models[model_name].__module__
116 ):
117 warnings.warn(
118 f"Model '{package_label}.{model_name}' was already registered. Reloading models is not "
119 "advised as it can lead to inconsistencies, most notably with "
120 "related models.",
121 RuntimeWarning,
122 stacklevel=2,
123 )
124 else:
125 raise RuntimeError(
126 f"Conflicting '{model_name}' models in application '{package_label}': {app_models[model_name]} and {model}."
127 )
128 app_models[model_name] = model
129 self.do_pending_operations(model)
130 self.clear_cache()
131
132 def _get_registered_model(self, package_label: str, model_name: str) -> type[Model]:
133 """
134 Similar to get_model(), but doesn't require that an app exists with
135 the given package_label.
136
137 It's safe to call this method at import time, even while the registry
138 is being populated.
139 """
140 model = self.all_models[package_label].get(model_name.lower())
141 if model is None:
142 raise LookupError(f"Model '{package_label}.{model_name}' not registered.")
143 return model
144
145 def clear_cache(self) -> None:
146 """
147 Clear all internal caches, for methods that alter the app registry.
148
149 This is mostly used in tests.
150 """
151 # Call expire cache on each model. This will purge
152 # the relation tree and the fields cache.
153 self.get_models.cache_clear()
154 if self.ready:
155 # Circumvent self.get_models() to prevent that the cache is refilled.
156 # This particularly prevents that an empty value is cached while cloning.
157 for package_models in self.all_models.values():
158 for model in package_models.values():
159 model._model_meta._expire_cache()
160
161 def lazy_model_operation(
162 self, function: Callable[..., None], *model_keys: tuple[str, str]
163 ) -> None:
164 """
165 Take a function and a number of ("package_label", "modelname") tuples, and
166 when all the corresponding models have been imported and registered,
167 call the function with the model classes as its arguments.
168
169 The function passed to this method must accept exactly n models as
170 arguments, where n=len(model_keys).
171 """
172 # Base case: no arguments, just execute the function.
173 if not model_keys:
174 function()
175 # Recursive case: take the head of model_keys, wait for the
176 # corresponding model class to be imported and registered, then apply
177 # that argument to the supplied function. Pass the resulting partial
178 # to lazy_model_operation() along with the remaining model args and
179 # repeat until all models are loaded and all arguments are applied.
180 else:
181 next_model, *more_models = model_keys
182
183 # This will be executed after the class corresponding to next_model
184 # has been imported and registered. The `func` attribute provides
185 # duck-type compatibility with partials.
186 def apply_next_model(model: type[Model]) -> None:
187 next_function = partial(apply_next_model.func, model) # type: ignore[attr-defined]
188 self.lazy_model_operation(next_function, *more_models)
189
190 apply_next_model.func = function # type: ignore[attr-defined]
191
192 # If the model has already been imported and registered, partially
193 # apply it to the function now. If not, add it to the list of
194 # pending operations for the model, where it will be executed with
195 # the model class as its sole argument once the model is ready.
196 try:
197 model_class = self._get_registered_model(*next_model)
198 except LookupError:
199 self._pending_operations[next_model].append(apply_next_model)
200 else:
201 apply_next_model(model_class)
202
203 def do_pending_operations(self, model: type[Model]) -> None:
204 """
205 Take a newly-prepared model and pass it to each function waiting for
206 it. This is called at the very end of Models.register_model().
207 """
208 key = model.model_options.package_label, model.model_options.model_name
209 for function in self._pending_operations.pop(key, []):
210 function(model)
211
212
213models_registry = ModelsRegistry()
214
215
216# Decorator to register a model (using the internal registry for the correct state).
217def register_model(model_class: M) -> M:
218 model_class._model_meta.models_registry.register_model(
219 model_class.model_options.package_label,
220 model_class, # type: ignore[arg-type]
221 )
222 return model_class