Plain is headed towards 1.0! Subscribe for development updates →

  1import copy
  2import inspect
  3from functools import wraps
  4from importlib import import_module
  5
  6from plain.models.db import router
  7from plain.models.query import QuerySet
  8
  9
 10class BaseManager:
 11    # To retain order, track each time a Manager instance is created.
 12    creation_counter = 0
 13
 14    # Set to True for the 'objects' managers that are automatically created.
 15    auto_created = False
 16
 17    #: If set to True the manager will be serialized into migrations and will
 18    #: thus be available in e.g. RunPython operations.
 19    use_in_migrations = False
 20
 21    def __new__(cls, *args, **kwargs):
 22        # Capture the arguments to make returning them trivial.
 23        obj = super().__new__(cls)
 24        obj._constructor_args = (args, kwargs)
 25        return obj
 26
 27    def __init__(self):
 28        super().__init__()
 29        self._set_creation_counter()
 30        self.model = None
 31        self.name = None
 32        self._db = None
 33        self._hints = {}
 34
 35    def __str__(self):
 36        """Return "package_label.model_label.manager_name"."""
 37        return f"{self.model._meta.label}.{self.name}"
 38
 39    def __class_getitem__(cls, *args, **kwargs):
 40        return cls
 41
 42    def deconstruct(self):
 43        """
 44        Return a 5-tuple of the form (as_manager (True), manager_class,
 45        queryset_class, args, kwargs).
 46
 47        Raise a ValueError if the manager is dynamically generated.
 48        """
 49        qs_class = self._queryset_class
 50        if getattr(self, "_built_with_as_manager", False):
 51            # using MyQuerySet.as_manager()
 52            return (
 53                True,  # as_manager
 54                None,  # manager_class
 55                f"{qs_class.__module__}.{qs_class.__name__}",  # qs_class
 56                None,  # args
 57                None,  # kwargs
 58            )
 59        else:
 60            module_name = self.__module__
 61            name = self.__class__.__name__
 62            # Make sure it's actually there and not an inner class
 63            module = import_module(module_name)
 64            if not hasattr(module, name):
 65                raise ValueError(
 66                    f"Could not find manager {name} in {module_name}.\n"
 67                    "Please note that you need to inherit from managers you "
 68                    "dynamically generated with 'from_queryset()'."
 69                )
 70            return (
 71                False,  # as_manager
 72                f"{module_name}.{name}",  # manager_class
 73                None,  # qs_class
 74                self._constructor_args[0],  # args
 75                self._constructor_args[1],  # kwargs
 76            )
 77
 78    def check(self, **kwargs):
 79        return []
 80
 81    @classmethod
 82    def _get_queryset_methods(cls, queryset_class):
 83        def create_method(name, method):
 84            @wraps(method)
 85            def manager_method(self, *args, **kwargs):
 86                return getattr(self.get_queryset(), name)(*args, **kwargs)
 87
 88            return manager_method
 89
 90        new_methods = {}
 91        for name, method in inspect.getmembers(
 92            queryset_class, predicate=inspect.isfunction
 93        ):
 94            # Only copy missing methods.
 95            if hasattr(cls, name):
 96                continue
 97            # Only copy public methods or methods with the attribute
 98            # queryset_only=False.
 99            queryset_only = getattr(method, "queryset_only", None)
100            if queryset_only or (queryset_only is None and name.startswith("_")):
101                continue
102            # Copy the method onto the manager.
103            new_methods[name] = create_method(name, method)
104        return new_methods
105
106    @classmethod
107    def from_queryset(cls, queryset_class, class_name=None):
108        if class_name is None:
109            class_name = f"{cls.__name__}From{queryset_class.__name__}"
110        return type(
111            class_name,
112            (cls,),
113            {
114                "_queryset_class": queryset_class,
115                **cls._get_queryset_methods(queryset_class),
116            },
117        )
118
119    def contribute_to_class(self, cls, name):
120        self.name = self.name or name
121        self.model = cls
122
123        setattr(cls, name, ManagerDescriptor(self))
124
125        cls._meta.add_manager(self)
126
127    def _set_creation_counter(self):
128        """
129        Set the creation counter value for this instance and increment the
130        class-level copy.
131        """
132        self.creation_counter = BaseManager.creation_counter
133        BaseManager.creation_counter += 1
134
135    def db_manager(self, using=None, hints=None):
136        obj = copy.copy(self)
137        obj._db = using or self._db
138        obj._hints = hints or self._hints
139        return obj
140
141    @property
142    def db(self):
143        return self._db or router.db_for_read(self.model, **self._hints)
144
145    #######################
146    # PROXIES TO QUERYSET #
147    #######################
148
149    def get_queryset(self):
150        """
151        Return a new QuerySet object. Subclasses can override this method to
152        customize the behavior of the Manager.
153        """
154        return self._queryset_class(model=self.model, using=self._db, hints=self._hints)
155
156    def all(self):
157        # We can't proxy this method through the `QuerySet` like we do for the
158        # rest of the `QuerySet` methods. This is because `QuerySet.all()`
159        # works by creating a "copy" of the current queryset and in making said
160        # copy, all the cached `prefetch_related` lookups are lost. See the
161        # implementation of `RelatedManager.get_queryset()` for a better
162        # understanding of how this comes into play.
163        return self.get_queryset()
164
165    def __eq__(self, other):
166        return (
167            isinstance(other, self.__class__)
168            and self._constructor_args == other._constructor_args
169        )
170
171    def __hash__(self):
172        return id(self)
173
174
175class Manager(BaseManager.from_queryset(QuerySet)):
176    pass
177
178
179class ManagerDescriptor:
180    def __init__(self, manager):
181        self.manager = manager
182
183    def __get__(self, instance, cls=None):
184        if instance is not None:
185            raise AttributeError(
186                f"Manager isn't accessible via {cls.__name__} instances"
187            )
188
189        return cls._meta.managers_map[self.manager.name]