Plain is headed towards 1.0! Subscribe for development updates →

  1"""
  2Functions for creating and restoring url-safe signed JSON objects.
  3
  4The format used looks like this:
  5
  6>>> signing.dumps("hello")
  7'ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk'
  8
  9There are two components here, separated by a ':'. The first component is a
 10URLsafe base64 encoded JSON of the object passed to dumps(). The second
 11component is a base64 encoded hmac/SHA-256 hash of "$first_component:$secret"
 12
 13signing.loads(s) checks the signature and returns the deserialized object.
 14If the signature fails, a BadSignature exception is raised.
 15
 16>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk")
 17'hello'
 18>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified")
 19...
 20BadSignature: Signature "ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified" does not match
 21
 22You can optionally compress the JSON prior to base64 encoding it to save
 23space, using the compress=True argument. This checks if compression actually
 24helps and only applies compression if the result is a shorter string:
 25
 26>>> signing.dumps(list(range(1, 20)), compress=True)
 27'.eJwFwcERACAIwLCF-rCiILN47r-GyZVJsNgkxaFxoDgxcOHGxMKD_T7vhAml:1QaUaL:BA0thEZrp4FQVXIXuOvYJtLJSrQ'
 28
 29The fact that the string is compressed is signalled by the prefixed '.' at the
 30start of the base64 JSON.
 31
 32There are 65 url-safe characters: the 64 used by url-safe base64 and the ':'.
 33These functions make use of all of them.
 34"""
 35
 36import base64
 37import datetime
 38import json
 39import time
 40import zlib
 41
 42from plain.runtime import settings
 43from plain.utils.crypto import constant_time_compare, salted_hmac
 44from plain.utils.regex_helper import _lazy_re_compile
 45
 46_SEP_UNSAFE = _lazy_re_compile(r"^[A-z0-9-_=]*$")
 47BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
 48
 49
 50class BadSignature(Exception):
 51    """Signature does not match."""
 52
 53    pass
 54
 55
 56class SignatureExpired(BadSignature):
 57    """Signature timestamp is older than required max_age."""
 58
 59    pass
 60
 61
 62def b62_encode(s):
 63    if s == 0:
 64        return "0"
 65    sign = "-" if s < 0 else ""
 66    s = abs(s)
 67    encoded = ""
 68    while s > 0:
 69        s, remainder = divmod(s, 62)
 70        encoded = BASE62_ALPHABET[remainder] + encoded
 71    return sign + encoded
 72
 73
 74def b62_decode(s):
 75    if s == "0":
 76        return 0
 77    sign = 1
 78    if s[0] == "-":
 79        s = s[1:]
 80        sign = -1
 81    decoded = 0
 82    for digit in s:
 83        decoded = decoded * 62 + BASE62_ALPHABET.index(digit)
 84    return sign * decoded
 85
 86
 87def b64_encode(s):
 88    return base64.urlsafe_b64encode(s).strip(b"=")
 89
 90
 91def b64_decode(s):
 92    pad = b"=" * (-len(s) % 4)
 93    return base64.urlsafe_b64decode(s + pad)
 94
 95
 96def base64_hmac(salt, value, key, algorithm="sha1"):
 97    return b64_encode(
 98        salted_hmac(salt, value, key, algorithm=algorithm).digest()
 99    ).decode()
100
101
102class JSONSerializer:
103    """
104    Simple wrapper around json to be used in signing.dumps and
105    signing.loads.
106    """
107
108    def dumps(self, obj):
109        return json.dumps(obj, separators=(",", ":")).encode("latin-1")
110
111    def loads(self, data):
112        return json.loads(data.decode("latin-1"))
113
114
115def dumps(
116    obj, key=None, salt="plain.signing", serializer=JSONSerializer, compress=False
117):
118    """
119    Return URL-safe, hmac signed base64 compressed JSON string. If key is
120    None, use settings.SECRET_KEY instead. The hmac algorithm is the default
121    Signer algorithm.
122
123    If compress is True (not the default), check if compressing using zlib can
124    save some space. Prepend a '.' to signify compression. This is included
125    in the signature, to protect against zip bombs.
126
127    Salt can be used to namespace the hash, so that a signed string is
128    only valid for a given namespace. Leaving this at the default
129    value or re-using a salt value across different parts of your
130    application without good cause is a security risk.
131
132    The serializer is expected to return a bytestring.
133    """
134    return TimestampSigner(key=key, salt=salt).sign_object(
135        obj, serializer=serializer, compress=compress
136    )
137
138
139def loads(
140    s,
141    key=None,
142    salt="plain.signing",
143    serializer=JSONSerializer,
144    max_age=None,
145    fallback_keys=None,
146):
147    """
148    Reverse of dumps(), raise BadSignature if signature fails.
149
150    The serializer is expected to accept a bytestring.
151    """
152    return TimestampSigner(
153        key=key, salt=salt, fallback_keys=fallback_keys
154    ).unsign_object(
155        s,
156        serializer=serializer,
157        max_age=max_age,
158    )
159
160
161class Signer:
162    def __init__(
163        self,
164        *,
165        key=None,
166        sep=":",
167        salt=None,
168        algorithm="sha256",
169        fallback_keys=None,
170    ):
171        self.key = key or settings.SECRET_KEY
172        self.fallback_keys = (
173            fallback_keys
174            if fallback_keys is not None
175            else settings.SECRET_KEY_FALLBACKS
176        )
177        self.sep = sep
178        self.salt = salt or f"{self.__class__.__module__}.{self.__class__.__name__}"
179        self.algorithm = algorithm
180
181        if _SEP_UNSAFE.match(self.sep):
182            raise ValueError(
183                f"Unsafe Signer separator: {sep!r} (cannot be empty or consist of "
184                "only A-z0-9-_=)",
185            )
186
187    def signature(self, value, key=None):
188        key = key or self.key
189        return base64_hmac(self.salt + "signer", value, key, algorithm=self.algorithm)
190
191    def sign(self, value):
192        return f"{value}{self.sep}{self.signature(value)}"
193
194    def unsign(self, signed_value):
195        if self.sep not in signed_value:
196            raise BadSignature(f'No "{self.sep}" found in value')
197        value, sig = signed_value.rsplit(self.sep, 1)
198        for key in [self.key, *self.fallback_keys]:
199            if constant_time_compare(sig, self.signature(value, key)):
200                return value
201        raise BadSignature(f'Signature "{sig}" does not match')
202
203    def sign_object(self, obj, serializer=JSONSerializer, compress=False):
204        """
205        Return URL-safe, hmac signed base64 compressed JSON string.
206
207        If compress is True (not the default), check if compressing using zlib
208        can save some space. Prepend a '.' to signify compression. This is
209        included in the signature, to protect against zip bombs.
210
211        The serializer is expected to return a bytestring.
212        """
213        data = serializer().dumps(obj)
214        # Flag for if it's been compressed or not.
215        is_compressed = False
216
217        if compress:
218            # Avoid zlib dependency unless compress is being used.
219            compressed = zlib.compress(data)
220            if len(compressed) < (len(data) - 1):
221                data = compressed
222                is_compressed = True
223        base64d = b64_encode(data).decode()
224        if is_compressed:
225            base64d = "." + base64d
226        return self.sign(base64d)
227
228    def unsign_object(self, signed_obj, serializer=JSONSerializer, **kwargs):
229        # Signer.unsign() returns str but base64 and zlib compression operate
230        # on bytes.
231        base64d = self.unsign(signed_obj, **kwargs).encode()
232        decompress = base64d[:1] == b"."
233        if decompress:
234            # It's compressed; uncompress it first.
235            base64d = base64d[1:]
236        data = b64_decode(base64d)
237        if decompress:
238            data = zlib.decompress(data)
239        return serializer().loads(data)
240
241
242class TimestampSigner(Signer):
243    def timestamp(self):
244        return b62_encode(int(time.time()))
245
246    def sign(self, value):
247        value = f"{value}{self.sep}{self.timestamp()}"
248        return super().sign(value)
249
250    def unsign(self, value, max_age=None):
251        """
252        Retrieve original value and check it wasn't signed more
253        than max_age seconds ago.
254        """
255        result = super().unsign(value)
256        value, timestamp = result.rsplit(self.sep, 1)
257        timestamp = b62_decode(timestamp)
258        if max_age is not None:
259            if isinstance(max_age, datetime.timedelta):
260                max_age = max_age.total_seconds()
261            # Check timestamp is not older than max_age
262            age = time.time() - timestamp
263            if age > max_age:
264                raise SignatureExpired(f"Signature age {age} > {max_age} seconds")
265        return value