Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import functools
  4import warnings
  5from collections import defaultdict
  6from collections.abc import Callable
  7from functools import partial
  8from typing import TYPE_CHECKING, TypeVar
  9
 10if TYPE_CHECKING:
 11    from plain.models.base import Model
 12
 13# Model classes are registered, not instances.
 14M = TypeVar("M", bound="Model")
 15
 16
 17class ModelsRegistryNotReady(Exception):
 18    """The plain.models registry is not populated yet"""
 19
 20    pass
 21
 22
 23class ModelsRegistry:
 24    def __init__(self) -> None:
 25        # Mapping of app labels => model names => model classes. Every time a
 26        # model is imported, ModelBase.__new__ calls packages.register_model which
 27        # creates an entry in all_models. All imported models are registered,
 28        # regardless of whether they're defined in an installed application
 29        # and whether the registry has been populated. Since it isn't possible
 30        # to reimport a module safely (it could reexecute initialization code)
 31        # all_models is never overridden or reset.
 32        self.all_models: defaultdict[str, dict[str, type[Model]]] = defaultdict(dict)
 33
 34        # Maps ("package_label", "modelname") tuples to lists of functions to be
 35        # called when the corresponding model is ready. Used by this class's
 36        # `lazy_model_operation()` and `do_pending_operations()` methods.
 37        self._pending_operations: defaultdict[
 38            tuple[str, str], list[Callable[[type[Model]], None]]
 39        ] = defaultdict(list)
 40
 41        self.ready: bool = False
 42
 43    def check_ready(self) -> None:
 44        """Raise an exception if all models haven't been imported yet."""
 45        if not self.ready:
 46            raise ModelsRegistryNotReady("Models aren't loaded yet.")
 47
 48    # This method is performance-critical at least for Plain's test suite.
 49    @functools.cache
 50    def get_models(self, *, package_label: str = "") -> list[type[Model]]:
 51        """
 52        Return a list of all installed models.
 53
 54        By default, the following models aren't included:
 55
 56        - auto-created models for many-to-many relations without
 57          an explicit intermediate table,
 58
 59        Set the corresponding keyword argument to True to include such models.
 60        """
 61
 62        self.check_ready()
 63
 64        models = []
 65
 66        # Get models for a single package
 67        if package_label:
 68            package_models = self.all_models[package_label]
 69            for model in package_models.values():
 70                models.append(model)
 71            return models
 72
 73        # Get models for all packages
 74        for package_models in self.all_models.values():
 75            for model in package_models.values():
 76                models.append(model)
 77
 78        return models
 79
 80    def get_model(
 81        self,
 82        package_label: str,
 83        model_name: str | None = None,
 84        require_ready: bool = True,
 85    ) -> type[Model]:
 86        """
 87        Return the model matching the given package_label and model_name.
 88
 89        As a shortcut, package_label may be in the form <package_label>.<model_name>.
 90
 91        model_name is case-insensitive.
 92
 93        Raise LookupError if no application exists with this label, or no
 94        model exists with this name in the application. Raise ValueError if
 95        called with a single argument that doesn't contain exactly one dot.
 96        """
 97
 98        if require_ready:
 99            self.check_ready()
100
101        if model_name is None:
102            package_label, model_name = package_label.split(".")
103
104        package_models = self.all_models[package_label]
105        return package_models[model_name.lower()]
106
107    def register_model(self, package_label: str, model: type[Model]) -> None:
108        # Since this method is called when models are imported, it cannot
109        # perform imports because of the risk of import loops. It mustn't
110        # call get_package_config().
111        model_name = model.model_options.model_name
112        app_models = self.all_models[package_label]
113        if model_name in app_models:
114            if (
115                model.__name__ == app_models[model_name].__name__
116                and model.__module__ == app_models[model_name].__module__
117            ):
118                warnings.warn(
119                    f"Model '{package_label}.{model_name}' was already registered. Reloading models is not "
120                    "advised as it can lead to inconsistencies, most notably with "
121                    "related models.",
122                    RuntimeWarning,
123                    stacklevel=2,
124                )
125            else:
126                raise RuntimeError(
127                    f"Conflicting '{model_name}' models in application '{package_label}': {app_models[model_name]} and {model}."
128                )
129        app_models[model_name] = model
130        self.do_pending_operations(model)
131        self.clear_cache()
132
133    def _get_registered_model(self, package_label: str, model_name: str) -> type[Model]:
134        """
135        Similar to get_model(), but doesn't require that an app exists with
136        the given package_label.
137
138        It's safe to call this method at import time, even while the registry
139        is being populated.
140        """
141        model = self.all_models[package_label].get(model_name.lower())
142        if model is None:
143            raise LookupError(f"Model '{package_label}.{model_name}' not registered.")
144        return model
145
146    def clear_cache(self) -> None:
147        """
148        Clear all internal caches, for methods that alter the app registry.
149
150        This is mostly used in tests.
151        """
152        # Call expire cache on each model. This will purge
153        # the relation tree and the fields cache.
154        self.get_models.cache_clear()
155        if self.ready:
156            # Circumvent self.get_models() to prevent that the cache is refilled.
157            # This particularly prevents that an empty value is cached while cloning.
158            for package_models in self.all_models.values():
159                for model in package_models.values():
160                    model._model_meta._expire_cache()
161
162    def lazy_model_operation(
163        self, function: Callable[..., None], *model_keys: tuple[str, str]
164    ) -> None:
165        """
166        Take a function and a number of ("package_label", "modelname") tuples, and
167        when all the corresponding models have been imported and registered,
168        call the function with the model classes as its arguments.
169
170        The function passed to this method must accept exactly n models as
171        arguments, where n=len(model_keys).
172        """
173        # Base case: no arguments, just execute the function.
174        if not model_keys:
175            function()
176        # Recursive case: take the head of model_keys, wait for the
177        # corresponding model class to be imported and registered, then apply
178        # that argument to the supplied function. Pass the resulting partial
179        # to lazy_model_operation() along with the remaining model args and
180        # repeat until all models are loaded and all arguments are applied.
181        else:
182            next_model, *more_models = model_keys
183
184            # This will be executed after the class corresponding to next_model
185            # has been imported and registered.
186            def apply_next_model(model: type[Model]) -> None:
187                next_function = partial(function, model)
188                self.lazy_model_operation(next_function, *more_models)
189
190            # If the model has already been imported and registered, partially
191            # apply it to the function now. If not, add it to the list of
192            # pending operations for the model, where it will be executed with
193            # the model class as its sole argument once the model is ready.
194            try:
195                model_class = self._get_registered_model(*next_model)
196            except LookupError:
197                self._pending_operations[next_model].append(apply_next_model)
198            else:
199                apply_next_model(model_class)
200
201    def do_pending_operations(self, model: type[Model]) -> None:
202        """
203        Take a newly-prepared model and pass it to each function waiting for
204        it. This is called at the very end of Models.register_model().
205        """
206        key = model.model_options.package_label, model.model_options.model_name
207        for function in self._pending_operations.pop(key, []):
208            function(model)
209
210
211models_registry = ModelsRegistry()
212
213
214# Decorator to register a model (using the internal registry for the correct state).
215def register_model(model_class: type[M]) -> type[M]:
216    model_class._model_meta.models_registry.register_model(
217        model_class.model_options.package_label,
218        model_class,
219    )
220    return model_class