1import inspect
2from functools import wraps
3from importlib import import_module
4
5from plain.models.query import QuerySet
6
7
8class BaseManager:
9 # To retain order, track each time a Manager instance is created.
10 creation_counter = 0
11
12 # Set to True for the 'objects' managers that are automatically created.
13 auto_created = False
14
15 #: If set to True the manager will be serialized into migrations and will
16 #: thus be available in e.g. RunPython operations.
17 use_in_migrations = False
18
19 def __new__(cls, *args, **kwargs):
20 # Capture the arguments to make returning them trivial.
21 obj = super().__new__(cls)
22 obj._constructor_args = (args, kwargs)
23 return obj
24
25 def __init__(self):
26 super().__init__()
27 self._set_creation_counter()
28 self.model = None
29 self.name = None
30 self._hints = {}
31
32 def __str__(self):
33 """Return "package_label.model_label.manager_name"."""
34 return f"{self.model._meta.label}.{self.name}"
35
36 def __class_getitem__(cls, *args, **kwargs):
37 return cls
38
39 def deconstruct(self):
40 """
41 Return a 5-tuple of the form (as_manager (True), manager_class,
42 queryset_class, args, kwargs).
43
44 Raise a ValueError if the manager is dynamically generated.
45 """
46 qs_class = self._queryset_class
47 if getattr(self, "_built_with_as_manager", False):
48 # using MyQuerySet.as_manager()
49 return (
50 True, # as_manager
51 None, # manager_class
52 f"{qs_class.__module__}.{qs_class.__name__}", # qs_class
53 None, # args
54 None, # kwargs
55 )
56 else:
57 module_name = self.__module__
58 name = self.__class__.__name__
59 # Make sure it's actually there and not an inner class
60 module = import_module(module_name)
61 if not hasattr(module, name):
62 raise ValueError(
63 f"Could not find manager {name} in {module_name}.\n"
64 "Please note that you need to inherit from managers you "
65 "dynamically generated with 'from_queryset()'."
66 )
67 return (
68 False, # as_manager
69 f"{module_name}.{name}", # manager_class
70 None, # qs_class
71 self._constructor_args[0], # args
72 self._constructor_args[1], # kwargs
73 )
74
75 def check(self, **kwargs):
76 return []
77
78 @classmethod
79 def _get_queryset_methods(cls, queryset_class):
80 def create_method(name, method):
81 @wraps(method)
82 def manager_method(self, *args, **kwargs):
83 return getattr(self.get_queryset(), name)(*args, **kwargs)
84
85 return manager_method
86
87 new_methods = {}
88 for name, method in inspect.getmembers(
89 queryset_class, predicate=inspect.isfunction
90 ):
91 # Only copy missing methods.
92 if hasattr(cls, name):
93 continue
94 # Only copy public methods or methods with the attribute
95 # queryset_only=False.
96 queryset_only = getattr(method, "queryset_only", None)
97 if queryset_only or (queryset_only is None and name.startswith("_")):
98 continue
99 # Copy the method onto the manager.
100 new_methods[name] = create_method(name, method)
101 return new_methods
102
103 @classmethod
104 def from_queryset(cls, queryset_class, class_name=None):
105 if class_name is None:
106 class_name = f"{cls.__name__}From{queryset_class.__name__}"
107 return type(
108 class_name,
109 (cls,),
110 {
111 "_queryset_class": queryset_class,
112 **cls._get_queryset_methods(queryset_class),
113 },
114 )
115
116 def contribute_to_class(self, cls, name):
117 self.name = self.name or name
118 self.model = cls
119
120 setattr(cls, name, ManagerDescriptor(self))
121
122 cls._meta.add_manager(self)
123
124 def _set_creation_counter(self):
125 """
126 Set the creation counter value for this instance and increment the
127 class-level copy.
128 """
129 self.creation_counter = BaseManager.creation_counter
130 BaseManager.creation_counter += 1
131
132 #######################
133 # PROXIES TO QUERYSET #
134 #######################
135
136 def get_queryset(self):
137 """
138 Return a new QuerySet object. Subclasses can override this method to
139 customize the behavior of the Manager.
140 """
141 return self._queryset_class(model=self.model, hints=self._hints)
142
143 def all(self):
144 # We can't proxy this method through the `QuerySet` like we do for the
145 # rest of the `QuerySet` methods. This is because `QuerySet.all()`
146 # works by creating a "copy" of the current queryset and in making said
147 # copy, all the cached `prefetch_related` lookups are lost. See the
148 # implementation of `RelatedManager.get_queryset()` for a better
149 # understanding of how this comes into play.
150 return self.get_queryset()
151
152 def __eq__(self, other):
153 return (
154 isinstance(other, self.__class__)
155 and self._constructor_args == other._constructor_args
156 )
157
158 def __hash__(self):
159 return id(self)
160
161
162class Manager(BaseManager.from_queryset(QuerySet)):
163 pass
164
165
166class ManagerDescriptor:
167 def __init__(self, manager):
168 self.manager = manager
169
170 def __get__(self, instance, cls=None):
171 if instance is not None:
172 raise AttributeError(
173 f"Manager isn't accessible via {cls.__name__} instances"
174 )
175
176 return cls._meta.managers_map[self.manager.name]