Plain is headed towards 1.0! Subscribe for development updates →

  1"""
  2This module converts requested URLs to callback view functions.
  3
  4URLResolver is the main class here. Its resolve() method takes a URL (as
  5a string) and returns a ResolverMatch object which provides access to all
  6attributes of the resolved URL match.
  7"""
  8
  9import functools
 10import re
 11from threading import local
 12from urllib.parse import quote
 13
 14from plain.preflight.urls import check_resolver
 15from plain.runtime import settings
 16from plain.utils.datastructures import MultiValueDict
 17from plain.utils.http import RFC3986_SUBDELIMS, escape_leading_slashes
 18from plain.utils.module_loading import import_string
 19from plain.utils.regex_helper import normalize
 20
 21from .exceptions import NoReverseMatch, Resolver404
 22from .patterns import RegexPattern, URLPattern
 23
 24
 25class ResolverMatch:
 26    def __init__(
 27        self,
 28        *,
 29        view,
 30        args,
 31        kwargs,
 32        url_name=None,
 33        namespaces=None,
 34        route=None,
 35    ):
 36        self.view = view
 37        self.args = args
 38        self.kwargs = kwargs
 39        self.url_name = url_name
 40        self.route = route
 41
 42        # If a URLRegexResolver doesn't have a namespace or namespace, it passes
 43        # in an empty value.
 44        self.namespaces = [x for x in namespaces if x] if namespaces else []
 45        self.namespace = ":".join(self.namespaces)
 46
 47        self.namespaced_url_name = (
 48            ":".join(self.namespaces + [url_name]) if url_name else None
 49        )
 50
 51
 52def get_resolver(router=None):
 53    if router is None:
 54        router = settings.URLS_ROUTER
 55
 56    return _get_cached_resolver(router)
 57
 58
 59@functools.cache
 60def _get_cached_resolver(router):
 61    if isinstance(router, str):
 62        # Do this inside the cached call, primarily for the URLS_ROUTER
 63        router_class = import_string(router)
 64        router = router_class()
 65
 66    return URLResolver(pattern=RegexPattern(r"^/"), router=router)
 67
 68
 69@functools.cache
 70def get_ns_resolver(ns_pattern, resolver, converters):
 71    from .routers import Router
 72
 73    # Build a namespaced resolver for the given parent urls_module pattern.
 74    # This makes it possible to have captured parameters in the parent
 75    # urls_module pattern.
 76    pattern = RegexPattern(ns_pattern)
 77    pattern.converters = dict(converters)
 78
 79    class _NestedRouter(Router):
 80        namespace = ""
 81        urls = resolver.url_patterns
 82
 83    ns_resolver = URLResolver(pattern=pattern, router=_NestedRouter())
 84
 85    class _NamespacedRouter(Router):
 86        namespace = ""
 87        urls = [ns_resolver]
 88
 89    return URLResolver(
 90        pattern=RegexPattern(r"^/"),
 91        router=_NamespacedRouter(),
 92    )
 93
 94
 95class URLResolver:
 96    def __init__(
 97        self,
 98        *,
 99        pattern,
100        router,
101    ):
102        self.pattern = pattern
103        self.router = router
104        self._reverse_dict = {}
105        self._namespace_dict = {}
106        self._populated = False
107        self._local = local()
108
109        # Set these immediately, in part so we can find routers
110        # where the attributes weren't set correctly.
111        self.namespace = self.router.namespace
112        self.url_patterns = self.router.urls
113
114    def __repr__(self):
115        return f"<{self.__class__.__name__} {repr(self.router)} ({self.namespace}) {self.pattern.describe()}>"
116
117    def check(self):
118        messages = []
119        for pattern in self.url_patterns:
120            messages.extend(check_resolver(pattern))
121        return messages or self.pattern.check()
122
123    def _populate(self):
124        # Short-circuit if called recursively in this thread to prevent
125        # infinite recursion. Concurrent threads may call this at the same
126        # time and will need to continue, so set 'populating' on a
127        # thread-local variable.
128        if getattr(self._local, "populating", False):
129            return
130        try:
131            self._local.populating = True
132            lookups = MultiValueDict()
133            namespaces = {}
134            for url_pattern in reversed(self.url_patterns):
135                p_pattern = url_pattern.pattern.regex.pattern
136                p_pattern = p_pattern.removeprefix("^")
137                if isinstance(url_pattern, URLPattern):
138                    bits = normalize(url_pattern.pattern.regex.pattern)
139                    lookups.appendlist(
140                        url_pattern.view,
141                        (
142                            bits,
143                            p_pattern,
144                            url_pattern.pattern.converters,
145                        ),
146                    )
147                    if url_pattern.name is not None:
148                        lookups.appendlist(
149                            url_pattern.name,
150                            (
151                                bits,
152                                p_pattern,
153                                url_pattern.pattern.converters,
154                            ),
155                        )
156                else:  # url_pattern is a URLResolver.
157                    url_pattern._populate()
158                    if url_pattern.namespace:
159                        namespaces[url_pattern.namespace] = (p_pattern, url_pattern)
160                    else:
161                        for name in url_pattern.reverse_dict:
162                            for (
163                                _,
164                                pat,
165                                converters,
166                            ) in url_pattern.reverse_dict.getlist(name):
167                                new_matches = normalize(p_pattern + pat)
168                                lookups.appendlist(
169                                    name,
170                                    (
171                                        new_matches,
172                                        p_pattern + pat,
173                                        {
174                                            **self.pattern.converters,
175                                            **url_pattern.pattern.converters,
176                                            **converters,
177                                        },
178                                    ),
179                                )
180                        for namespace, (
181                            prefix,
182                            sub_pattern,
183                        ) in url_pattern.namespace_dict.items():
184                            current_converters = url_pattern.pattern.converters
185                            sub_pattern.pattern.converters.update(current_converters)
186                            namespaces[namespace] = (p_pattern + prefix, sub_pattern)
187            self._namespace_dict = namespaces
188            self._reverse_dict = lookups
189            self._populated = True
190        finally:
191            self._local.populating = False
192
193    @property
194    def reverse_dict(self):
195        if not self._reverse_dict:
196            self._populate()
197        return self._reverse_dict
198
199    @property
200    def namespace_dict(self):
201        if not self._namespace_dict:
202            self._populate()
203        return self._namespace_dict
204
205    @staticmethod
206    def _join_route(route1, route2):
207        """Join two routes, without the starting ^ in the second route."""
208        if not route1:
209            return route2
210        route2 = route2.removeprefix("^")
211        return route1 + route2
212
213    def resolve(self, path):
214        path = str(path)  # path may be a reverse_lazy object
215        match = self.pattern.match(path)
216        if match:
217            new_path, args, kwargs = match
218            for pattern in self.url_patterns:
219                try:
220                    sub_match = pattern.resolve(new_path)
221                except Resolver404:
222                    pass
223                else:
224                    if sub_match:
225                        # Merge captured arguments in match with submatch
226                        # Update the sub_match_dict with the kwargs from the sub_match.
227                        sub_match_dict = {**kwargs, **sub_match.kwargs}
228                        # If there are *any* named groups, ignore all non-named groups.
229                        # Otherwise, pass all non-named arguments as positional
230                        # arguments.
231                        sub_match_args = sub_match.args
232                        if not sub_match_dict:
233                            sub_match_args = args + sub_match.args
234                        current_route = (
235                            ""
236                            if isinstance(pattern, URLPattern)
237                            else str(pattern.pattern)
238                        )
239                        return ResolverMatch(
240                            view=sub_match.view,
241                            args=sub_match_args,
242                            kwargs=sub_match_dict,
243                            url_name=sub_match.url_name,
244                            namespaces=[self.namespace] + sub_match.namespaces,
245                            route=self._join_route(current_route, sub_match.route),
246                        )
247            raise Resolver404({"path": new_path})
248        raise Resolver404({"path": path})
249
250    def reverse(self, lookup_view, *args, **kwargs):
251        if args and kwargs:
252            raise ValueError("Don't mix *args and **kwargs in call to reverse()!")
253
254        if not self._populated:
255            self._populate()
256
257        possibilities = self.reverse_dict.getlist(lookup_view)
258
259        for possibility, pattern, converters in possibilities:
260            for result, params in possibility:
261                if args:
262                    if len(args) != len(params):
263                        continue
264                    candidate_subs = dict(zip(params, args))
265                else:
266                    if set(kwargs).symmetric_difference(params):
267                        continue
268                    candidate_subs = kwargs
269                # Convert the candidate subs to text using Converter.to_url().
270                text_candidate_subs = {}
271                match = True
272                for k, v in candidate_subs.items():
273                    if k in converters:
274                        try:
275                            text_candidate_subs[k] = converters[k].to_url(v)
276                        except ValueError:
277                            match = False
278                            break
279                    else:
280                        text_candidate_subs[k] = str(v)
281                if not match:
282                    continue
283                # WSGI provides decoded URLs, without %xx escapes, and the URL
284                # resolver operates on such URLs. First substitute arguments
285                # without quoting to build a decoded URL and look for a match.
286                # Then, if we have a match, redo the substitution with quoted
287                # arguments in order to return a properly encoded URL.
288
289                # There was a lot of script_prefix handling code before,
290                # so this is a crutch to leave the below as-is for now.
291                _prefix = "/"
292
293                candidate_pat = _prefix.replace("%", "%%") + result
294                if re.search(
295                    f"^{re.escape(_prefix)}{pattern}",
296                    candidate_pat % text_candidate_subs,
297                ):
298                    # safe characters from `pchar` definition of RFC 3986
299                    url = quote(
300                        candidate_pat % text_candidate_subs,
301                        safe=RFC3986_SUBDELIMS + "/~:@",
302                    )
303                    # Don't allow construction of scheme relative urls.
304                    return escape_leading_slashes(url)
305        # lookup_view can be URL name or callable, but callables are not
306        # friendly in error messages.
307        m = getattr(lookup_view, "__module__", None)
308        n = getattr(lookup_view, "__name__", None)
309        if m is not None and n is not None:
310            lookup_view_s = f"{m}.{n}"
311        else:
312            lookup_view_s = lookup_view
313
314        patterns = [pos[1] for pos in possibilities]
315        if patterns:
316            if args:
317                arg_msg = f"arguments '{args}'"
318            elif kwargs:
319                arg_msg = f"keyword arguments '{kwargs}'"
320            else:
321                arg_msg = "no arguments"
322            msg = "Reverse for '%s' with %s not found. %d pattern(s) tried: %s" % (  # noqa: UP031
323                lookup_view_s,
324                arg_msg,
325                len(patterns),
326                patterns,
327            )
328        else:
329            msg = (
330                f"Reverse for '{lookup_view_s}' not found. '{lookup_view_s}' is not "
331                "a valid view function or pattern name."
332            )
333        raise NoReverseMatch(msg)