1from __future__ import annotations
 2
 3# Copyright (c) Kenneth Reitz & individual contributors
 4# All rights reserved.
 5# Redistribution and use in source and binary forms, with or without modification,
 6# are permitted provided that the following conditions are met:
 7#     1. Redistributions of source code must retain the above copyright notice,
 8#        this list of conditions and the following disclaimer.
 9#     2. Redistributions in binary form must reproduce the above copyright
10#        notice, this list of conditions and the following disclaimer in the
11#        documentation and/or other materials provided with the distribution.
12#     3. Neither the name of Plain nor the names of its contributors may be used
13#        to endorse or promote products derived from this software without
14#        specific prior written permission.
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
19# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
22# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25import urllib.parse as urlparse
26
27from .connections import DatabaseConfig
28
29SCHEMES = {"postgres", "postgresql", "pgsql"}
30
31# Register database schemes in URLs.
32for scheme in SCHEMES:
33    urlparse.uses_netloc.append(scheme)
34
35
36def parse_database_url(url: str) -> DatabaseConfig:
37    """Parses a database URL."""
38    spliturl = urlparse.urlsplit(url)
39
40    if spliturl.scheme not in SCHEMES:
41        raise ValueError(
42            f"No support for '{spliturl.scheme}'. We support: {', '.join(sorted(SCHEMES))}"
43        )
44
45    path = spliturl.path[1:]
46    query = urlparse.parse_qs(spliturl.query)
47
48    # Handle percent-encoded hostnames (e.g. socket paths).
49    hostname = spliturl.hostname or ""
50    if "%" in hostname:
51        # Use netloc to avoid lowercased paths, strip credentials if present.
52        hostname = spliturl.netloc
53        if "@" in hostname:
54            hostname = hostname.rsplit("@", 1)[1]
55        hostname = urlparse.unquote(hostname)
56
57    parsed_config: DatabaseConfig = {
58        "DATABASE": urlparse.unquote(path or ""),
59        "USER": urlparse.unquote(spliturl.username or ""),
60        "PASSWORD": urlparse.unquote(spliturl.password or ""),
61        "HOST": hostname,
62        "PORT": spliturl.port,
63    }
64
65    # Pass the query string into OPTIONS.
66    options = {key: values[-1] for key, values in query.items()}
67    if options:
68        parsed_config["OPTIONS"] = options
69
70    return parsed_config
71
72
73def build_database_url(config: DatabaseConfig) -> str:
74    """Build a database URL from a configuration dictionary."""
75    options = config.get("OPTIONS", {})
76    query = urlparse.urlencode(list(options.items()))
77
78    user = urlparse.quote(str(config.get("USER", "")))
79    password = urlparse.quote(str(config.get("PASSWORD", "")))
80    host = config.get("HOST", "")
81    port = config.get("PORT")
82    name = urlparse.quote(str(config.get("DATABASE", "")))
83
84    netloc = ""
85    if user or password:
86        netloc += user
87        if password:
88            netloc += f":{password}"
89        netloc += "@"
90    netloc += host
91    if port:
92        netloc += f":{port}"
93
94    return urlparse.urlunsplit(("postgresql", netloc, f"/{name}", query, ""))