Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import functools
  4import warnings
  5from collections import defaultdict
  6from collections.abc import Callable
  7from functools import partial
  8from typing import TYPE_CHECKING, TypeVar
  9
 10if TYPE_CHECKING:
 11    from plain.models.base import Model
 12
 13M = TypeVar("M", bound="Model")
 14
 15
 16class ModelsRegistryNotReady(Exception):
 17    """The plain.models registry is not populated yet"""
 18
 19    pass
 20
 21
 22class ModelsRegistry:
 23    def __init__(self) -> None:
 24        # Mapping of app labels => model names => model classes. Every time a
 25        # model is imported, ModelBase.__new__ calls packages.register_model which
 26        # creates an entry in all_models. All imported models are registered,
 27        # regardless of whether they're defined in an installed application
 28        # and whether the registry has been populated. Since it isn't possible
 29        # to reimport a module safely (it could reexecute initialization code)
 30        # all_models is never overridden or reset.
 31        self.all_models: defaultdict[str, dict[str, type[Model]]] = defaultdict(dict)
 32
 33        # Maps ("package_label", "modelname") tuples to lists of functions to be
 34        # called when the corresponding model is ready. Used by this class's
 35        # `lazy_model_operation()` and `do_pending_operations()` methods.
 36        self._pending_operations: defaultdict[
 37            tuple[str, str], list[Callable[[type[Model]], None]]
 38        ] = defaultdict(list)
 39
 40        self.ready: bool = False
 41
 42    def check_ready(self) -> None:
 43        """Raise an exception if all models haven't been imported yet."""
 44        if not self.ready:
 45            raise ModelsRegistryNotReady("Models aren't loaded yet.")
 46
 47    # This method is performance-critical at least for Plain's test suite.
 48    @functools.cache
 49    def get_models(self, *, package_label: str = "") -> list[type[Model]]:
 50        """
 51        Return a list of all installed models.
 52
 53        By default, the following models aren't included:
 54
 55        - auto-created models for many-to-many relations without
 56          an explicit intermediate table,
 57
 58        Set the corresponding keyword argument to True to include such models.
 59        """
 60
 61        self.check_ready()
 62
 63        models = []
 64
 65        # Get models for a single package
 66        if package_label:
 67            package_models = self.all_models[package_label]
 68            for model in package_models.values():
 69                models.append(model)
 70            return models
 71
 72        # Get models for all packages
 73        for package_models in self.all_models.values():
 74            for model in package_models.values():
 75                models.append(model)
 76
 77        return models
 78
 79    def get_model(
 80        self,
 81        package_label: str,
 82        model_name: str | None = None,
 83        require_ready: bool = True,
 84    ) -> type[Model]:
 85        """
 86        Return the model matching the given package_label and model_name.
 87
 88        As a shortcut, package_label may be in the form <package_label>.<model_name>.
 89
 90        model_name is case-insensitive.
 91
 92        Raise LookupError if no application exists with this label, or no
 93        model exists with this name in the application. Raise ValueError if
 94        called with a single argument that doesn't contain exactly one dot.
 95        """
 96
 97        if require_ready:
 98            self.check_ready()
 99
100        if model_name is None:
101            package_label, model_name = package_label.split(".")
102
103        package_models = self.all_models[package_label]
104        return package_models[model_name.lower()]
105
106    def register_model(self, package_label: str, model: type[Model]) -> None:
107        # Since this method is called when models are imported, it cannot
108        # perform imports because of the risk of import loops. It mustn't
109        # call get_package_config().
110        model_name = model.model_options.model_name
111        app_models = self.all_models[package_label]
112        if model_name in app_models:
113            if (
114                model.__name__ == app_models[model_name].__name__
115                and model.__module__ == app_models[model_name].__module__
116            ):
117                warnings.warn(
118                    f"Model '{package_label}.{model_name}' was already registered. Reloading models is not "
119                    "advised as it can lead to inconsistencies, most notably with "
120                    "related models.",
121                    RuntimeWarning,
122                    stacklevel=2,
123                )
124            else:
125                raise RuntimeError(
126                    f"Conflicting '{model_name}' models in application '{package_label}': {app_models[model_name]} and {model}."
127                )
128        app_models[model_name] = model
129        self.do_pending_operations(model)
130        self.clear_cache()
131
132    def _get_registered_model(self, package_label: str, model_name: str) -> type[Model]:
133        """
134        Similar to get_model(), but doesn't require that an app exists with
135        the given package_label.
136
137        It's safe to call this method at import time, even while the registry
138        is being populated.
139        """
140        model = self.all_models[package_label].get(model_name.lower())
141        if model is None:
142            raise LookupError(f"Model '{package_label}.{model_name}' not registered.")
143        return model
144
145    def clear_cache(self) -> None:
146        """
147        Clear all internal caches, for methods that alter the app registry.
148
149        This is mostly used in tests.
150        """
151        # Call expire cache on each model. This will purge
152        # the relation tree and the fields cache.
153        self.get_models.cache_clear()
154        if self.ready:
155            # Circumvent self.get_models() to prevent that the cache is refilled.
156            # This particularly prevents that an empty value is cached while cloning.
157            for package_models in self.all_models.values():
158                for model in package_models.values():
159                    model._model_meta._expire_cache()
160
161    def lazy_model_operation(
162        self, function: Callable[..., None], *model_keys: tuple[str, str]
163    ) -> None:
164        """
165        Take a function and a number of ("package_label", "modelname") tuples, and
166        when all the corresponding models have been imported and registered,
167        call the function with the model classes as its arguments.
168
169        The function passed to this method must accept exactly n models as
170        arguments, where n=len(model_keys).
171        """
172        # Base case: no arguments, just execute the function.
173        if not model_keys:
174            function()
175        # Recursive case: take the head of model_keys, wait for the
176        # corresponding model class to be imported and registered, then apply
177        # that argument to the supplied function. Pass the resulting partial
178        # to lazy_model_operation() along with the remaining model args and
179        # repeat until all models are loaded and all arguments are applied.
180        else:
181            next_model, *more_models = model_keys
182
183            # This will be executed after the class corresponding to next_model
184            # has been imported and registered. The `func` attribute provides
185            # duck-type compatibility with partials.
186            def apply_next_model(model: type[Model]) -> None:
187                next_function = partial(apply_next_model.func, model)  # type: ignore[attr-defined]
188                self.lazy_model_operation(next_function, *more_models)
189
190            apply_next_model.func = function  # type: ignore[attr-defined]
191
192            # If the model has already been imported and registered, partially
193            # apply it to the function now. If not, add it to the list of
194            # pending operations for the model, where it will be executed with
195            # the model class as its sole argument once the model is ready.
196            try:
197                model_class = self._get_registered_model(*next_model)
198            except LookupError:
199                self._pending_operations[next_model].append(apply_next_model)
200            else:
201                apply_next_model(model_class)
202
203    def do_pending_operations(self, model: type[Model]) -> None:
204        """
205        Take a newly-prepared model and pass it to each function waiting for
206        it. This is called at the very end of Models.register_model().
207        """
208        key = model.model_options.package_label, model.model_options.model_name
209        for function in self._pending_operations.pop(key, []):
210            function(model)
211
212
213models_registry = ModelsRegistry()
214
215
216# Decorator to register a model (using the internal registry for the correct state).
217def register_model(model_class: M) -> M:
218    model_class._model_meta.models_registry.register_model(
219        model_class.model_options.package_label,
220        model_class,  # type: ignore[arg-type]
221    )
222    return model_class