Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import time
  4import zlib
  5from typing import Any
  6
  7from plain.signing import (
  8    JSONSerializer,
  9    SignatureExpired,
 10    Signer,
 11    b62_decode,
 12    b62_encode,
 13    b64_decode,
 14    b64_encode,
 15)
 16
 17
 18class ExpiringSigner(Signer):
 19    """A signer with an embedded expiration (vs max age unsign)"""
 20
 21    def sign(self, value: str, expires_in: int) -> str:
 22        timestamp = b62_encode(int(time.time() + expires_in))
 23        value = f"{value}{self.sep}{timestamp}"
 24        return super().sign(value)
 25
 26    def unsign(self, value: str) -> str:
 27        """
 28        Retrieve original value and check the expiration hasn't passed.
 29        """
 30        result = super().unsign(value)
 31        value, timestamp = result.rsplit(self.sep, 1)
 32        timestamp = b62_decode(timestamp)
 33        if timestamp < time.time():
 34            raise SignatureExpired("Signature expired")
 35        return value
 36
 37    def sign_object(
 38        self,
 39        obj: Any,
 40        serializer: type = JSONSerializer,
 41        compress: bool = False,
 42        expires_in: int | None = None,
 43    ) -> str:
 44        """
 45        Return URL-safe, hmac signed base64 compressed JSON string.
 46
 47        If compress is True (not the default), check if compressing using zlib
 48        can save some space. Prepend a '.' to signify compression. This is
 49        included in the signature, to protect against zip bombs.
 50
 51        The serializer is expected to return a bytestring.
 52        """
 53        data = serializer().dumps(obj)
 54        # Flag for if it's been compressed or not.
 55        is_compressed = False
 56
 57        if compress:
 58            # Avoid zlib dependency unless compress is being used.
 59            compressed = zlib.compress(data)
 60            if len(compressed) < (len(data) - 1):
 61                data = compressed
 62                is_compressed = True
 63        base64d = b64_encode(data).decode()
 64        if is_compressed:
 65            base64d = "." + base64d
 66        return self.sign(base64d, expires_in)
 67
 68    def unsign_object(self, signed_obj: str, serializer: type = JSONSerializer) -> Any:
 69        # Signer.unsign() returns str but base64 and zlib compression operate
 70        # on bytes.
 71        base64d = self.unsign(signed_obj).encode()
 72        decompress = base64d[:1] == b"."
 73        if decompress:
 74            # It's compressed; uncompress it first.
 75            base64d = base64d[1:]
 76        data = b64_decode(base64d)
 77        if decompress:
 78            data = zlib.decompress(data)
 79        return serializer().loads(data)
 80
 81
 82def dumps(
 83    obj: Any,
 84    key: str | None = None,
 85    salt: str = "plain.loginlink",
 86    serializer: type = JSONSerializer,
 87    compress: bool = False,
 88    expires_in: int | None = None,
 89) -> str:
 90    """
 91    Return URL-safe, hmac signed base64 compressed JSON string. If key is
 92    None, use settings.SECRET_KEY instead. The hmac algorithm is the default
 93    Signer algorithm.
 94
 95    If compress is True (not the default), check if compressing using zlib can
 96    save some space. Prepend a '.' to signify compression. This is included
 97    in the signature, to protect against zip bombs.
 98
 99    Salt can be used to namespace the hash, so that a signed string is
100    only valid for a given namespace. Leaving this at the default
101    value or re-using a salt value across different parts of your
102    application without good cause is a security risk.
103
104    The serializer is expected to return a bytestring.
105    """
106    return ExpiringSigner(key=key, salt=salt).sign_object(
107        obj, serializer=serializer, compress=compress, expires_in=expires_in
108    )
109
110
111def loads(
112    s: str,
113    key: str | None = None,
114    salt: str = "plain.loginlink",
115    serializer: type = JSONSerializer,
116    fallback_keys: list[str] | None = None,
117) -> Any:
118    """
119    Reverse of dumps(), raise BadSignature if signature fails.
120
121    The serializer is expected to accept a bytestring.
122    """
123    return ExpiringSigner(
124        key=key, salt=salt, fallback_keys=fallback_keys
125    ).unsign_object(
126        s,
127        serializer=serializer,
128    )