1from __future__ import annotations
2
3import copy
4import itertools
5import operator
6from collections.abc import Callable
7from functools import total_ordering, wraps
8from typing import Any, TypeVar
9
10T = TypeVar("T")
11
12
13class classproperty:
14 """
15 Decorator that converts a method with a single cls argument into a property
16 that can be accessed directly from the class.
17 """
18
19 def __init__(self, method: Callable[[type], Any] | None = None) -> None:
20 self.fget = method
21
22 def __get__(self, instance: Any, cls: type | None = None) -> Any:
23 assert cls is not None, "cls must be provided for classproperty"
24 assert self.fget is not None, "fget must be set before accessing classproperty"
25 return self.fget(cls)
26
27 def getter(self, method: Callable[[type], Any]) -> classproperty:
28 self.fget = method
29 return self
30
31
32class Promise:
33 """
34 Base class for the proxy class created in the closure of the lazy function.
35 It's used to recognize promises in code.
36 """
37
38 pass
39
40
41def lazy(func: Callable[..., Any], *resultclasses: type) -> Callable[..., Any]:
42 """
43 Turn any callable into a lazy evaluated callable. result classes or types
44 is required -- at least one is needed so that the automatic forcing of
45 the lazy evaluation code is triggered. Results are not memoized; the
46 function is evaluated on every access.
47 """
48
49 @total_ordering
50 class __proxy__(Promise):
51 """
52 Encapsulate a function call and act as a proxy for methods that are
53 called on the result of that function. The function is not evaluated
54 until one of the methods on the result is called.
55 """
56
57 __prepared = False
58
59 def __init__(self, args: tuple[Any, ...], kw: dict[str, Any]) -> None:
60 self.__args = args
61 self.__kw = kw
62 if not self.__prepared:
63 self.__prepare_class__()
64 self.__class__.__prepared = True
65
66 def __reduce__(self) -> tuple[Callable[..., Any], tuple[Any, ...]]:
67 return (
68 _lazy_proxy_unpickle,
69 (func, self.__args, self.__kw) + resultclasses,
70 )
71
72 def __repr__(self) -> str:
73 return repr(self.__cast())
74
75 @classmethod
76 def __prepare_class__(cls) -> None:
77 for resultclass in resultclasses:
78 for type_ in resultclass.mro():
79 for method_name in type_.__dict__:
80 # All __promise__ return the same wrapper method, they
81 # look up the correct implementation when called.
82 if hasattr(cls, method_name):
83 continue
84 meth = cls.__promise__(method_name)
85 setattr(cls, method_name, meth)
86 cls._delegate_bytes = bytes in resultclasses
87 cls._delegate_text = str in resultclasses
88 if cls._delegate_bytes and cls._delegate_text:
89 raise ValueError(
90 "Cannot call lazy() with both bytes and text return types."
91 )
92 if cls._delegate_text:
93 setattr(cls, "__str__", cls.__text_cast)
94 elif cls._delegate_bytes:
95 setattr(cls, "__bytes__", cls.__bytes_cast)
96
97 @classmethod
98 def __promise__(cls, method_name: str) -> Callable[..., Any]:
99 # Builds a wrapper around some magic method
100 def __wrapper__(self: Any, *args: Any, **kw: Any) -> Any:
101 # Automatically triggers the evaluation of a lazy value and
102 # applies the given magic method of the result type.
103 res = func(*self.__args, **self.__kw)
104 return getattr(res, method_name)(*args, **kw)
105
106 return __wrapper__
107
108 def __text_cast(self) -> str:
109 return func(*self.__args, **self.__kw)
110
111 def __bytes_cast(self) -> bytes:
112 return bytes(func(*self.__args, **self.__kw))
113
114 def __bytes_cast_encoded(self) -> bytes:
115 return func(*self.__args, **self.__kw).encode()
116
117 def __cast(self) -> Any:
118 if self._delegate_bytes:
119 return self.__bytes_cast()
120 elif self._delegate_text:
121 return self.__text_cast()
122 else:
123 return func(*self.__args, **self.__kw)
124
125 def __str__(self) -> str:
126 # object defines __str__(), so __prepare_class__() won't overload
127 # a __str__() method from the proxied class.
128 return str(self.__cast())
129
130 def __eq__(self, other: Any) -> bool:
131 if isinstance(other, Promise):
132 other = other.__cast()
133 return self.__cast() == other
134
135 def __lt__(self, other: Any) -> bool:
136 if isinstance(other, Promise):
137 other = other.__cast()
138 return self.__cast() < other
139
140 def __hash__(self) -> int:
141 return hash(self.__cast())
142
143 def __mod__(self, rhs: Any) -> Any:
144 if self._delegate_text:
145 return str(self) % rhs
146 return self.__cast() % rhs
147
148 def __add__(self, other: Any) -> Any:
149 return self.__cast() + other
150
151 def __radd__(self, other: Any) -> Any:
152 return other + self.__cast()
153
154 def __deepcopy__(self, memo: dict[int, Any]) -> __proxy__:
155 # Instances of this class are effectively immutable. It's just a
156 # collection of functions. So we don't need to do anything
157 # complicated for copying.
158 memo[id(self)] = self
159 return self
160
161 @wraps(func)
162 def __wrapper__(*args: Any, **kw: Any) -> __proxy__:
163 # Creates the proxy object, instead of the actual value.
164 return __proxy__(args, kw)
165
166 return __wrapper__
167
168
169def _lazy_proxy_unpickle(
170 func: Callable[..., Any],
171 args: tuple[Any, ...],
172 kwargs: dict[str, Any],
173 *resultclasses: type,
174) -> Any:
175 return lazy(func, *resultclasses)(*args, **kwargs)
176
177
178def keep_lazy(
179 *resultclasses: type,
180) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
181 """
182 A decorator that allows a function to be called with one or more lazy
183 arguments. If none of the args are lazy, the function is evaluated
184 immediately, otherwise a __proxy__ is returned that will evaluate the
185 function when needed.
186 """
187 if not resultclasses:
188 raise TypeError("You must pass at least one argument to keep_lazy().")
189
190 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
191 lazy_func = lazy(func, *resultclasses)
192
193 @wraps(func)
194 def wrapper(*args: Any, **kwargs: Any) -> Any:
195 if any(
196 isinstance(arg, Promise)
197 for arg in itertools.chain(args, kwargs.values())
198 ):
199 return lazy_func(*args, **kwargs)
200 return func(*args, **kwargs)
201
202 return wrapper
203
204 return decorator
205
206
207def keep_lazy_text(func: Callable[..., Any]) -> Callable[..., Any]:
208 """
209 A decorator for functions that accept lazy arguments and return text.
210 """
211 return keep_lazy(str)(func)
212
213
214empty = object()
215
216
217def new_method_proxy(func: Callable[..., Any]) -> Callable[..., Any]:
218 def inner(self: Any, *args: Any) -> Any:
219 if (_wrapped := self._wrapped) is empty:
220 self._setup()
221 _wrapped = self._wrapped
222 return func(_wrapped, *args)
223
224 inner._mask_wrapped = False # ty: ignore[unresolved-attribute]
225 return inner
226
227
228class LazyObject:
229 """
230 A wrapper for another class that can be used to delay instantiation of the
231 wrapped class.
232
233 By subclassing, you have the opportunity to intercept and alter the
234 instantiation. If you don't need to do that, use SimpleLazyObject.
235 """
236
237 # Avoid infinite recursion when tracing __init__ (#19456).
238 _wrapped = None
239
240 def __init__(self):
241 # Note: if a subclass overrides __init__(), it will likely need to
242 # override __copy__() and __deepcopy__() as well.
243 self._wrapped = empty
244
245 def __getattribute__(self, name: str) -> Any:
246 if name == "_wrapped":
247 # Avoid recursion when getting wrapped object.
248 return super().__getattribute__(name)
249 value = super().__getattribute__(name)
250 # If attribute is a proxy method, raise an AttributeError to call
251 # __getattr__() and use the wrapped object method.
252 if not getattr(value, "_mask_wrapped", True):
253 raise AttributeError
254 return value
255
256 __getattr__ = new_method_proxy(getattr)
257
258 def __setattr__(self, name: str, value: Any) -> None:
259 if name == "_wrapped":
260 # Assign to __dict__ to avoid infinite __setattr__ loops.
261 self.__dict__["_wrapped"] = value
262 else:
263 if self._wrapped is empty:
264 self._setup()
265 setattr(self._wrapped, name, value)
266
267 def __delattr__(self, name: str) -> None:
268 if name == "_wrapped":
269 raise TypeError("can't delete _wrapped.")
270 if self._wrapped is empty:
271 self._setup()
272 delattr(self._wrapped, name)
273
274 def _setup(self) -> None:
275 """
276 Must be implemented by subclasses to initialize the wrapped object.
277 """
278 raise NotImplementedError(
279 "subclasses of LazyObject must provide a _setup() method"
280 )
281
282 # Because we have messed with __class__ below, we confuse pickle as to what
283 # class we are pickling. We're going to have to initialize the wrapped
284 # object to successfully pickle it, so we might as well just pickle the
285 # wrapped object since they're supposed to act the same way.
286 #
287 # Unfortunately, if we try to simply act like the wrapped object, the ruse
288 # will break down when pickle gets our id(). Thus we end up with pickle
289 # thinking, in effect, that we are a distinct object from the wrapped
290 # object, but with the same __dict__. This can cause problems (see #25389).
291 #
292 # So instead, we define our own __reduce__ method and custom unpickler. We
293 # pickle the wrapped object as the unpickler's argument, so that pickle
294 # will pickle it normally, and then the unpickler simply returns its
295 # argument.
296 def __reduce__(self) -> tuple[Callable[[Any], Any], tuple[Any, ...]]:
297 if self._wrapped is empty:
298 self._setup()
299 return (unpickle_lazyobject, (self._wrapped,))
300
301 def __copy__(self) -> LazyObject | Any:
302 if self._wrapped is empty:
303 # If uninitialized, copy the wrapper. Use type(self), not
304 # self.__class__, because the latter is proxied.
305 return type(self)()
306 else:
307 # If initialized, return a copy of the wrapped object.
308 return copy.copy(self._wrapped)
309
310 def __deepcopy__(self, memo: dict[int, Any]) -> LazyObject | Any:
311 if self._wrapped is empty:
312 # We have to use type(self), not self.__class__, because the
313 # latter is proxied.
314 result = type(self)()
315 memo[id(self)] = result
316 return result
317 return copy.deepcopy(self._wrapped, memo)
318
319 __bytes__ = new_method_proxy(bytes)
320 __str__ = new_method_proxy(str)
321 __bool__ = new_method_proxy(bool)
322
323 # Introspection support
324 __dir__ = new_method_proxy(dir)
325
326 # Need to pretend to be the wrapped class, for the sake of objects that
327 # care about this (especially in equality tests)
328 __class__ = property(new_method_proxy(operator.attrgetter("__class__")))
329 __eq__ = new_method_proxy(operator.eq)
330 __lt__ = new_method_proxy(operator.lt)
331 __gt__ = new_method_proxy(operator.gt)
332 __ne__ = new_method_proxy(operator.ne)
333 __hash__ = new_method_proxy(hash)
334
335 # List/Tuple/Dictionary methods support
336 __getitem__ = new_method_proxy(operator.getitem)
337 __setitem__ = new_method_proxy(operator.setitem)
338 __delitem__ = new_method_proxy(operator.delitem)
339 __iter__ = new_method_proxy(iter)
340 __len__ = new_method_proxy(len)
341 __contains__ = new_method_proxy(operator.contains)
342
343
344def unpickle_lazyobject(wrapped: Any) -> Any:
345 """
346 Used to unpickle lazy objects. Just return its argument, which will be the
347 wrapped object.
348 """
349 return wrapped
350
351
352class SimpleLazyObject(LazyObject):
353 """
354 A lazy object initialized from any function.
355
356 Designed for compound objects of unknown type. For builtins or objects of
357 known type, use plain.utils.functional.lazy.
358 """
359
360 def __init__(self, func: Callable[[], Any]) -> None:
361 """
362 Pass in a callable that returns the object to be wrapped.
363
364 If copies are made of the resulting SimpleLazyObject, which can happen
365 in various circumstances within Plain, then you must ensure that the
366 callable can be safely run more than once and will return the same
367 value.
368 """
369 self.__dict__["_setupfunc"] = func
370 super().__init__()
371
372 def _setup(self) -> None:
373 self._wrapped = self._setupfunc()
374
375 # Return a meaningful representation of the lazy object for debugging
376 # without evaluating the wrapped object.
377 def __repr__(self) -> str:
378 if self._wrapped is empty:
379 repr_attr = self._setupfunc
380 else:
381 repr_attr = self._wrapped
382 return f"<{type(self).__name__}: {repr_attr!r}>"
383
384 def __copy__(self) -> SimpleLazyObject | Any:
385 if self._wrapped is empty:
386 # If uninitialized, copy the wrapper. Use SimpleLazyObject, not
387 # self.__class__, because the latter is proxied.
388 return SimpleLazyObject(self._setupfunc)
389 else:
390 # If initialized, return a copy of the wrapped object.
391 return copy.copy(self._wrapped)
392
393 def __deepcopy__(self, memo: dict[int, Any]) -> SimpleLazyObject | Any:
394 if self._wrapped is empty:
395 # We have to use SimpleLazyObject, not self.__class__, because the
396 # latter is proxied.
397 result = SimpleLazyObject(self._setupfunc)
398 memo[id(self)] = result
399 return result
400 return copy.deepcopy(self._wrapped, memo)
401
402 __add__ = new_method_proxy(operator.add)
403
404 @new_method_proxy
405 def __radd__(self: Any, other: Any) -> Any:
406 return other + self
407
408
409def partition(
410 predicate: Callable[[Any], bool], values: Any
411) -> tuple[list[Any], list[Any]]:
412 """
413 Split the values into two sets, based on the return value of the function
414 (True/False). e.g.:
415
416 >>> partition(lambda x: x > 3, range(5))
417 [0, 1, 2, 3], [4]
418 """
419 results: tuple[list[Any], list[Any]] = ([], [])
420 for item in values:
421 results[predicate(item)].append(item)
422 return results