Plain is headed towards 1.0! Subscribe for development updates →

  1import inspect
  2from functools import wraps
  3from importlib import import_module
  4
  5from plain.models.query import QuerySet
  6
  7
  8class BaseManager:
  9    # To retain order, track each time a Manager instance is created.
 10    creation_counter = 0
 11
 12    # Set to True for the 'objects' managers that are automatically created.
 13    auto_created = False
 14
 15    #: If set to True the manager will be serialized into migrations and will
 16    #: thus be available in e.g. RunPython operations.
 17    use_in_migrations = False
 18
 19    def __new__(cls, *args, **kwargs):
 20        # Capture the arguments to make returning them trivial.
 21        obj = super().__new__(cls)
 22        obj._constructor_args = (args, kwargs)
 23        return obj
 24
 25    def __init__(self):
 26        super().__init__()
 27        self._set_creation_counter()
 28        self.model = None
 29        self.name = None
 30        self._hints = {}
 31
 32    def __str__(self):
 33        """Return "package_label.model_label.manager_name"."""
 34        return f"{self.model._meta.label}.{self.name}"
 35
 36    def __class_getitem__(cls, *args, **kwargs):
 37        return cls
 38
 39    def deconstruct(self):
 40        """
 41        Return a 5-tuple of the form (as_manager (True), manager_class,
 42        queryset_class, args, kwargs).
 43
 44        Raise a ValueError if the manager is dynamically generated.
 45        """
 46        qs_class = self._queryset_class
 47        if getattr(self, "_built_with_as_manager", False):
 48            # using MyQuerySet.as_manager()
 49            return (
 50                True,  # as_manager
 51                None,  # manager_class
 52                f"{qs_class.__module__}.{qs_class.__name__}",  # qs_class
 53                None,  # args
 54                None,  # kwargs
 55            )
 56        else:
 57            module_name = self.__module__
 58            name = self.__class__.__name__
 59            # Make sure it's actually there and not an inner class
 60            module = import_module(module_name)
 61            if not hasattr(module, name):
 62                raise ValueError(
 63                    f"Could not find manager {name} in {module_name}.\n"
 64                    "Please note that you need to inherit from managers you "
 65                    "dynamically generated with 'from_queryset()'."
 66                )
 67            return (
 68                False,  # as_manager
 69                f"{module_name}.{name}",  # manager_class
 70                None,  # qs_class
 71                self._constructor_args[0],  # args
 72                self._constructor_args[1],  # kwargs
 73            )
 74
 75    def check(self, **kwargs):
 76        return []
 77
 78    @classmethod
 79    def _get_queryset_methods(cls, queryset_class):
 80        def create_method(name, method):
 81            @wraps(method)
 82            def manager_method(self, *args, **kwargs):
 83                return getattr(self.get_queryset(), name)(*args, **kwargs)
 84
 85            return manager_method
 86
 87        new_methods = {}
 88        for name, method in inspect.getmembers(
 89            queryset_class, predicate=inspect.isfunction
 90        ):
 91            # Only copy missing methods.
 92            if hasattr(cls, name):
 93                continue
 94            # Only copy public methods or methods with the attribute
 95            # queryset_only=False.
 96            queryset_only = getattr(method, "queryset_only", None)
 97            if queryset_only or (queryset_only is None and name.startswith("_")):
 98                continue
 99            # Copy the method onto the manager.
100            new_methods[name] = create_method(name, method)
101        return new_methods
102
103    @classmethod
104    def from_queryset(cls, queryset_class, class_name=None):
105        if class_name is None:
106            class_name = f"{cls.__name__}From{queryset_class.__name__}"
107        return type(
108            class_name,
109            (cls,),
110            {
111                "_queryset_class": queryset_class,
112                **cls._get_queryset_methods(queryset_class),
113            },
114        )
115
116    def contribute_to_class(self, cls, name):
117        self.name = self.name or name
118        self.model = cls
119
120        setattr(cls, name, ManagerDescriptor(self))
121
122        cls._meta.add_manager(self)
123
124    def _set_creation_counter(self):
125        """
126        Set the creation counter value for this instance and increment the
127        class-level copy.
128        """
129        self.creation_counter = BaseManager.creation_counter
130        BaseManager.creation_counter += 1
131
132    #######################
133    # PROXIES TO QUERYSET #
134    #######################
135
136    def get_queryset(self):
137        """
138        Return a new QuerySet object. Subclasses can override this method to
139        customize the behavior of the Manager.
140        """
141        return self._queryset_class(model=self.model, hints=self._hints)
142
143    def all(self):
144        # We can't proxy this method through the `QuerySet` like we do for the
145        # rest of the `QuerySet` methods. This is because `QuerySet.all()`
146        # works by creating a "copy" of the current queryset and in making said
147        # copy, all the cached `prefetch_related` lookups are lost. See the
148        # implementation of `RelatedManager.get_queryset()` for a better
149        # understanding of how this comes into play.
150        return self.get_queryset()
151
152    def __eq__(self, other):
153        return (
154            isinstance(other, self.__class__)
155            and self._constructor_args == other._constructor_args
156        )
157
158    def __hash__(self):
159        return id(self)
160
161
162class Manager(BaseManager.from_queryset(QuerySet)):
163    pass
164
165
166class ManagerDescriptor:
167    def __init__(self, manager):
168        self.manager = manager
169
170    def __get__(self, instance, cls=None):
171        if instance is not None:
172            raise AttributeError(
173                f"Manager isn't accessible via {cls.__name__} instances"
174            )
175
176        return cls._meta.managers_map[self.manager.name]