1"""
  2Functions for creating and restoring url-safe signed JSON objects.
  3
  4The format used looks like this:
  5
  6>>> signing.dumps("hello")
  7'ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk'
  8
  9There are two components here, separated by a ':'. The first component is a
 10URLsafe base64 encoded JSON of the object passed to dumps(). The second
 11component is a base64 encoded hmac/SHA-256 hash of "$first_component:$secret"
 12
 13signing.loads(s) checks the signature and returns the deserialized object.
 14If the signature fails, a BadSignature exception is raised.
 15
 16>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk")
 17'hello'
 18>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified")
 19...
 20BadSignature: Signature "ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified" does not match
 21
 22You can optionally compress the JSON prior to base64 encoding it to save
 23space, using the compress=True argument. This checks if compression actually
 24helps and only applies compression if the result is a shorter string:
 25
 26>>> signing.dumps(list(range(1, 20)), compress=True)
 27'.eJwFwcERACAIwLCF-rCiILN47r-GyZVJsNgkxaFxoDgxcOHGxMKD_T7vhAml:1QaUaL:BA0thEZrp4FQVXIXuOvYJtLJSrQ'
 28
 29The fact that the string is compressed is signalled by the prefixed '.' at the
 30start of the base64 JSON.
 31
 32There are 65 url-safe characters: the 64 used by url-safe base64 and the ':'.
 33These functions make use of all of them.
 34"""
 35
 36from __future__ import annotations
 37
 38import base64
 39import datetime
 40import hmac
 41import json
 42import time
 43import zlib
 44from typing import Any
 45
 46from plain.runtime import settings
 47from plain.utils.crypto import salted_hmac
 48from plain.utils.encoding import force_bytes
 49from plain.utils.regex_helper import _lazy_re_compile
 50
 51_SEP_UNSAFE = _lazy_re_compile(r"^[A-z0-9-_=]*$")
 52BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
 53
 54
 55class BadSignature(Exception):
 56    """Signature does not match."""
 57
 58    pass
 59
 60
 61class SignatureExpired(BadSignature):
 62    """Signature timestamp is older than required max_age."""
 63
 64    pass
 65
 66
 67def b62_encode(s: int) -> str:
 68    if s == 0:
 69        return "0"
 70    sign = "-" if s < 0 else ""
 71    s = abs(s)
 72    encoded = ""
 73    while s > 0:
 74        s, remainder = divmod(s, 62)
 75        encoded = BASE62_ALPHABET[remainder] + encoded
 76    return sign + encoded
 77
 78
 79def b62_decode(s: str) -> int:
 80    if s == "0":
 81        return 0
 82    sign = 1
 83    if s[0] == "-":
 84        s = s[1:]
 85        sign = -1
 86    decoded = 0
 87    for digit in s:
 88        decoded = decoded * 62 + BASE62_ALPHABET.index(digit)
 89    return sign * decoded
 90
 91
 92def b64_encode(s: bytes) -> bytes:
 93    return base64.urlsafe_b64encode(s).strip(b"=")
 94
 95
 96def b64_decode(s: bytes) -> bytes:
 97    pad = b"=" * (-len(s) % 4)
 98    return base64.urlsafe_b64decode(s + pad)
 99
100
101def base64_hmac(salt: str, value: str, key: str, algorithm: str = "sha1") -> str:
102    return b64_encode(
103        salted_hmac(salt, value, key, algorithm=algorithm).digest()
104    ).decode()
105
106
107class JSONSerializer:
108    """
109    Simple wrapper around json to be used in signing.dumps and
110    signing.loads.
111    """
112
113    def dumps(self, obj: Any) -> bytes:
114        return json.dumps(obj, separators=(",", ":")).encode("latin-1")
115
116    def loads(self, data: bytes) -> Any:
117        return json.loads(data.decode("latin-1"))
118
119
120def dumps(
121    obj: Any,
122    key: str | None = None,
123    salt: str = "plain.signing",
124    serializer: type[JSONSerializer] = JSONSerializer,
125    compress: bool = False,
126) -> str:
127    """
128    Return URL-safe, hmac signed base64 compressed JSON string. If key is
129    None, use settings.SECRET_KEY instead. The hmac algorithm is the default
130    Signer algorithm.
131
132    If compress is True (not the default), check if compressing using zlib can
133    save some space. Prepend a '.' to signify compression. This is included
134    in the signature, to protect against zip bombs.
135
136    Salt can be used to namespace the hash, so that a signed string is
137    only valid for a given namespace. Leaving this at the default
138    value or re-using a salt value across different parts of your
139    application without good cause is a security risk.
140
141    The serializer is expected to return a bytestring.
142    """
143    return TimestampSigner(key=key, salt=salt).sign_object(
144        obj, serializer=serializer, compress=compress
145    )
146
147
148def loads(
149    s: str,
150    key: str | None = None,
151    salt: str = "plain.signing",
152    serializer: type[JSONSerializer] = JSONSerializer,
153    max_age: int | float | datetime.timedelta | None = None,
154    fallback_keys: list[str] | None = None,
155) -> Any:
156    """
157    Reverse of dumps(), raise BadSignature if signature fails.
158
159    The serializer is expected to accept a bytestring.
160    """
161    return TimestampSigner(
162        key=key, salt=salt, fallback_keys=fallback_keys
163    ).unsign_object(
164        s,
165        serializer=serializer,
166        max_age=max_age,
167    )
168
169
170class Signer:
171    def __init__(
172        self,
173        *,
174        key: str | None = None,
175        sep: str = ":",
176        salt: str | None = None,
177        algorithm: str = "sha256",
178        fallback_keys: list[str] | None = None,
179    ) -> None:
180        self.key = key or settings.SECRET_KEY
181        self.fallback_keys = (
182            fallback_keys
183            if fallback_keys is not None
184            else settings.SECRET_KEY_FALLBACKS
185        )
186        self.sep = sep
187        self.salt = salt or f"{self.__class__.__module__}.{self.__class__.__name__}"
188        self.algorithm = algorithm
189
190        if _SEP_UNSAFE.match(self.sep):
191            raise ValueError(
192                f"Unsafe Signer separator: {sep!r} (cannot be empty or consist of "
193                "only A-z0-9-_=)",
194            )
195
196    def signature(self, value: str, key: str | None = None) -> str:
197        key = key or self.key
198        return base64_hmac(self.salt + "signer", value, key, algorithm=self.algorithm)
199
200    def sign(self, value: str) -> str:
201        return f"{value}{self.sep}{self.signature(value)}"
202
203    def unsign(self, signed_value: str) -> str:
204        if self.sep not in signed_value:
205            raise BadSignature(f'No "{self.sep}" found in value')
206        value, sig = signed_value.rsplit(self.sep, 1)
207        for key in [self.key, *self.fallback_keys]:
208            if hmac.compare_digest(
209                force_bytes(sig), force_bytes(self.signature(value, key))
210            ):
211                return value
212        raise BadSignature(f'Signature "{sig}" does not match')
213
214    def sign_object(
215        self,
216        obj: Any,
217        serializer: type[JSONSerializer] = JSONSerializer,
218        compress: bool = False,
219    ) -> str:
220        """
221        Return URL-safe, hmac signed base64 compressed JSON string.
222
223        If compress is True (not the default), check if compressing using zlib
224        can save some space. Prepend a '.' to signify compression. This is
225        included in the signature, to protect against zip bombs.
226
227        The serializer is expected to return a bytestring.
228        """
229        data = serializer().dumps(obj)
230        # Flag for if it's been compressed or not.
231        is_compressed = False
232
233        if compress:
234            # Avoid zlib dependency unless compress is being used.
235            compressed = zlib.compress(data)
236            if len(compressed) < (len(data) - 1):
237                data = compressed
238                is_compressed = True
239        base64d = b64_encode(data).decode()
240        if is_compressed:
241            base64d = "." + base64d
242        return self.sign(base64d)
243
244    def unsign_object(
245        self,
246        signed_obj: str,
247        serializer: type[JSONSerializer] = JSONSerializer,
248        **kwargs: Any,
249    ) -> Any:
250        # Signer.unsign() returns str but base64 and zlib compression operate
251        # on bytes.
252        base64d = self.unsign(signed_obj, **kwargs).encode()
253        decompress = base64d[:1] == b"."
254        if decompress:
255            # It's compressed; uncompress it first.
256            base64d = base64d[1:]
257        data = b64_decode(base64d)
258        if decompress:
259            data = zlib.decompress(data)
260        return serializer().loads(data)
261
262
263class TimestampSigner:
264    """A signer that includes a timestamp for max_age validation.
265
266    Uses composition rather than inheritance since the interface
267    intentionally differs from Signer (unsign accepts max_age parameter).
268    """
269
270    def __init__(
271        self,
272        *,
273        key: str | None = None,
274        sep: str = ":",
275        salt: str | None = None,
276        algorithm: str = "sha256",
277        fallback_keys: list[str] | None = None,
278    ) -> None:
279        # Compute default salt here to preserve backwards compatibility.
280        # When TimestampSigner inherited from Signer, the default salt was
281        # "plain.signing.TimestampSigner". Now that we use composition,
282        # we must set it explicitly rather than letting Signer compute its own.
283        if salt is None:
284            salt = f"{self.__class__.__module__}.{self.__class__.__name__}"
285        self._signer = Signer(
286            key=key,
287            sep=sep,
288            salt=salt,
289            algorithm=algorithm,
290            fallback_keys=fallback_keys,
291        )
292
293    @property
294    def sep(self) -> str:
295        return self._signer.sep
296
297    def timestamp(self) -> str:
298        return b62_encode(int(time.time()))
299
300    def sign(self, value: str) -> str:
301        value = f"{value}{self.sep}{self.timestamp()}"
302        return self._signer.sign(value)
303
304    def unsign(
305        self, value: str, max_age: int | float | datetime.timedelta | None = None
306    ) -> str:
307        """
308        Retrieve original value and check it wasn't signed more
309        than max_age seconds ago.
310        """
311        result = self._signer.unsign(value)
312        value, timestamp = result.rsplit(self.sep, 1)
313        ts = b62_decode(timestamp)
314        if max_age is not None:
315            if isinstance(max_age, datetime.timedelta):
316                max_age = max_age.total_seconds()
317            # Check timestamp is not older than max_age
318            age = time.time() - ts
319            if age > max_age:
320                raise SignatureExpired(f"Signature age {age} > {max_age} seconds")
321        return value
322
323    def sign_object(
324        self,
325        obj: Any,
326        serializer: type[JSONSerializer] = JSONSerializer,
327        compress: bool = False,
328    ) -> str:
329        """
330        Return URL-safe, hmac signed base64 compressed JSON string.
331
332        If compress is True (not the default), check if compressing using zlib
333        can save some space. Prepend a '.' to signify compression. This is
334        included in the signature, to protect against zip bombs.
335
336        The serializer is expected to return a bytestring.
337        """
338        data = serializer().dumps(obj)
339        is_compressed = False
340
341        if compress:
342            compressed = zlib.compress(data)
343            if len(compressed) < (len(data) - 1):
344                data = compressed
345                is_compressed = True
346        base64d = b64_encode(data).decode()
347        if is_compressed:
348            base64d = "." + base64d
349        return self.sign(base64d)
350
351    def unsign_object(
352        self,
353        signed_obj: str,
354        serializer: type[JSONSerializer] = JSONSerializer,
355        max_age: int | float | datetime.timedelta | None = None,
356    ) -> Any:
357        """Unsign and decode an object, optionally checking max_age."""
358        base64d = self.unsign(signed_obj, max_age=max_age).encode()
359        decompress = base64d[:1] == b"."
360        if decompress:
361            base64d = base64d[1:]
362        data = b64_decode(base64d)
363        if decompress:
364            data = zlib.decompress(data)
365        return serializer().loads(data)