1"""
2Functions for creating and restoring url-safe signed JSON objects.
3
4The format used looks like this:
5
6>>> signing.dumps("hello")
7'ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk'
8
9There are two components here, separated by a ':'. The first component is a
10URLsafe base64 encoded JSON of the object passed to dumps(). The second
11component is a base64 encoded hmac/SHA-256 hash of "$first_component:$secret"
12
13signing.loads(s) checks the signature and returns the deserialized object.
14If the signature fails, a BadSignature exception is raised.
15
16>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv422nZA4sgmk")
17'hello'
18>>> signing.loads("ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified")
19...
20BadSignature: Signature "ImhlbGxvIg:1QaUZC:YIye-ze3TTx7gtSv42-modified" does not match
21
22You can optionally compress the JSON prior to base64 encoding it to save
23space, using the compress=True argument. This checks if compression actually
24helps and only applies compression if the result is a shorter string:
25
26>>> signing.dumps(list(range(1, 20)), compress=True)
27'.eJwFwcERACAIwLCF-rCiILN47r-GyZVJsNgkxaFxoDgxcOHGxMKD_T7vhAml:1QaUaL:BA0thEZrp4FQVXIXuOvYJtLJSrQ'
28
29The fact that the string is compressed is signalled by the prefixed '.' at the
30start of the base64 JSON.
31
32There are 65 url-safe characters: the 64 used by url-safe base64 and the ':'.
33These functions make use of all of them.
34"""
35
36from __future__ import annotations
37
38import base64
39import datetime
40import hmac
41import json
42import time
43import zlib
44from typing import Any
45
46from plain.runtime import settings
47from plain.utils.crypto import salted_hmac
48from plain.utils.encoding import force_bytes
49from plain.utils.regex_helper import _lazy_re_compile
50
51_SEP_UNSAFE = _lazy_re_compile(r"^[A-z0-9-_=]*$")
52BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
53
54
55class BadSignature(Exception):
56 """Signature does not match."""
57
58 pass
59
60
61class SignatureExpired(BadSignature):
62 """Signature timestamp is older than required max_age."""
63
64 pass
65
66
67def b62_encode(s: int) -> str:
68 if s == 0:
69 return "0"
70 sign = "-" if s < 0 else ""
71 s = abs(s)
72 encoded = ""
73 while s > 0:
74 s, remainder = divmod(s, 62)
75 encoded = BASE62_ALPHABET[remainder] + encoded
76 return sign + encoded
77
78
79def b62_decode(s: str) -> int:
80 if s == "0":
81 return 0
82 sign = 1
83 if s[0] == "-":
84 s = s[1:]
85 sign = -1
86 decoded = 0
87 for digit in s:
88 decoded = decoded * 62 + BASE62_ALPHABET.index(digit)
89 return sign * decoded
90
91
92def b64_encode(s: bytes) -> bytes:
93 return base64.urlsafe_b64encode(s).strip(b"=")
94
95
96def b64_decode(s: bytes) -> bytes:
97 pad = b"=" * (-len(s) % 4)
98 return base64.urlsafe_b64decode(s + pad)
99
100
101def base64_hmac(salt: str, value: str, key: str, algorithm: str = "sha1") -> str:
102 return b64_encode(
103 salted_hmac(salt, value, key, algorithm=algorithm).digest()
104 ).decode()
105
106
107class JSONSerializer:
108 """
109 Simple wrapper around json to be used in signing.dumps and
110 signing.loads.
111 """
112
113 def dumps(self, obj: Any) -> bytes:
114 return json.dumps(obj, separators=(",", ":")).encode("latin-1")
115
116 def loads(self, data: bytes) -> Any:
117 return json.loads(data.decode("latin-1"))
118
119
120def dumps(
121 obj: Any,
122 key: str | None = None,
123 salt: str = "plain.signing",
124 serializer: type[JSONSerializer] = JSONSerializer,
125 compress: bool = False,
126) -> str:
127 """
128 Return URL-safe, hmac signed base64 compressed JSON string. If key is
129 None, use settings.SECRET_KEY instead. The hmac algorithm is the default
130 Signer algorithm.
131
132 If compress is True (not the default), check if compressing using zlib can
133 save some space. Prepend a '.' to signify compression. This is included
134 in the signature, to protect against zip bombs.
135
136 Salt can be used to namespace the hash, so that a signed string is
137 only valid for a given namespace. Leaving this at the default
138 value or re-using a salt value across different parts of your
139 application without good cause is a security risk.
140
141 The serializer is expected to return a bytestring.
142 """
143 return TimestampSigner(key=key, salt=salt).sign_object(
144 obj, serializer=serializer, compress=compress
145 )
146
147
148def loads(
149 s: str,
150 key: str | None = None,
151 salt: str = "plain.signing",
152 serializer: type[JSONSerializer] = JSONSerializer,
153 max_age: int | float | datetime.timedelta | None = None,
154 fallback_keys: list[str] | None = None,
155) -> Any:
156 """
157 Reverse of dumps(), raise BadSignature if signature fails.
158
159 The serializer is expected to accept a bytestring.
160 """
161 return TimestampSigner(
162 key=key, salt=salt, fallback_keys=fallback_keys
163 ).unsign_object(
164 s,
165 serializer=serializer,
166 max_age=max_age,
167 )
168
169
170class Signer:
171 def __init__(
172 self,
173 *,
174 key: str | None = None,
175 sep: str = ":",
176 salt: str | None = None,
177 algorithm: str = "sha256",
178 fallback_keys: list[str] | None = None,
179 ) -> None:
180 self.key = key or settings.SECRET_KEY
181 self.fallback_keys = (
182 fallback_keys
183 if fallback_keys is not None
184 else settings.SECRET_KEY_FALLBACKS
185 )
186 self.sep = sep
187 self.salt = salt or f"{self.__class__.__module__}.{self.__class__.__name__}"
188 self.algorithm = algorithm
189
190 if _SEP_UNSAFE.match(self.sep):
191 raise ValueError(
192 f"Unsafe Signer separator: {sep!r} (cannot be empty or consist of "
193 "only A-z0-9-_=)",
194 )
195
196 def signature(self, value: str, key: str | None = None) -> str:
197 key = key or self.key
198 return base64_hmac(self.salt + "signer", value, key, algorithm=self.algorithm)
199
200 def sign(self, value: str) -> str:
201 return f"{value}{self.sep}{self.signature(value)}"
202
203 def unsign(self, signed_value: str) -> str:
204 if self.sep not in signed_value:
205 raise BadSignature(f'No "{self.sep}" found in value')
206 value, sig = signed_value.rsplit(self.sep, 1)
207 for key in [self.key, *self.fallback_keys]:
208 if hmac.compare_digest(
209 force_bytes(sig), force_bytes(self.signature(value, key))
210 ):
211 return value
212 raise BadSignature(f'Signature "{sig}" does not match')
213
214 def sign_object(
215 self,
216 obj: Any,
217 serializer: type[JSONSerializer] = JSONSerializer,
218 compress: bool = False,
219 ) -> str:
220 """
221 Return URL-safe, hmac signed base64 compressed JSON string.
222
223 If compress is True (not the default), check if compressing using zlib
224 can save some space. Prepend a '.' to signify compression. This is
225 included in the signature, to protect against zip bombs.
226
227 The serializer is expected to return a bytestring.
228 """
229 data = serializer().dumps(obj)
230 # Flag for if it's been compressed or not.
231 is_compressed = False
232
233 if compress:
234 # Avoid zlib dependency unless compress is being used.
235 compressed = zlib.compress(data)
236 if len(compressed) < (len(data) - 1):
237 data = compressed
238 is_compressed = True
239 base64d = b64_encode(data).decode()
240 if is_compressed:
241 base64d = "." + base64d
242 return self.sign(base64d)
243
244 def unsign_object(
245 self,
246 signed_obj: str,
247 serializer: type[JSONSerializer] = JSONSerializer,
248 **kwargs: Any,
249 ) -> Any:
250 # Signer.unsign() returns str but base64 and zlib compression operate
251 # on bytes.
252 base64d = self.unsign(signed_obj, **kwargs).encode()
253 decompress = base64d[:1] == b"."
254 if decompress:
255 # It's compressed; uncompress it first.
256 base64d = base64d[1:]
257 data = b64_decode(base64d)
258 if decompress:
259 data = zlib.decompress(data)
260 return serializer().loads(data)
261
262
263class TimestampSigner:
264 """A signer that includes a timestamp for max_age validation.
265
266 Uses composition rather than inheritance since the interface
267 intentionally differs from Signer (unsign accepts max_age parameter).
268 """
269
270 def __init__(
271 self,
272 *,
273 key: str | None = None,
274 sep: str = ":",
275 salt: str | None = None,
276 algorithm: str = "sha256",
277 fallback_keys: list[str] | None = None,
278 ) -> None:
279 # Compute default salt here to preserve backwards compatibility.
280 # When TimestampSigner inherited from Signer, the default salt was
281 # "plain.signing.TimestampSigner". Now that we use composition,
282 # we must set it explicitly rather than letting Signer compute its own.
283 if salt is None:
284 salt = f"{self.__class__.__module__}.{self.__class__.__name__}"
285 self._signer = Signer(
286 key=key,
287 sep=sep,
288 salt=salt,
289 algorithm=algorithm,
290 fallback_keys=fallback_keys,
291 )
292
293 @property
294 def sep(self) -> str:
295 return self._signer.sep
296
297 def timestamp(self) -> str:
298 return b62_encode(int(time.time()))
299
300 def sign(self, value: str) -> str:
301 value = f"{value}{self.sep}{self.timestamp()}"
302 return self._signer.sign(value)
303
304 def unsign(
305 self, value: str, max_age: int | float | datetime.timedelta | None = None
306 ) -> str:
307 """
308 Retrieve original value and check it wasn't signed more
309 than max_age seconds ago.
310 """
311 result = self._signer.unsign(value)
312 value, timestamp = result.rsplit(self.sep, 1)
313 ts = b62_decode(timestamp)
314 if max_age is not None:
315 if isinstance(max_age, datetime.timedelta):
316 max_age = max_age.total_seconds()
317 # Check timestamp is not older than max_age
318 age = time.time() - ts
319 if age > max_age:
320 raise SignatureExpired(f"Signature age {age} > {max_age} seconds")
321 return value
322
323 def sign_object(
324 self,
325 obj: Any,
326 serializer: type[JSONSerializer] = JSONSerializer,
327 compress: bool = False,
328 ) -> str:
329 """
330 Return URL-safe, hmac signed base64 compressed JSON string.
331
332 If compress is True (not the default), check if compressing using zlib
333 can save some space. Prepend a '.' to signify compression. This is
334 included in the signature, to protect against zip bombs.
335
336 The serializer is expected to return a bytestring.
337 """
338 data = serializer().dumps(obj)
339 is_compressed = False
340
341 if compress:
342 compressed = zlib.compress(data)
343 if len(compressed) < (len(data) - 1):
344 data = compressed
345 is_compressed = True
346 base64d = b64_encode(data).decode()
347 if is_compressed:
348 base64d = "." + base64d
349 return self.sign(base64d)
350
351 def unsign_object(
352 self,
353 signed_obj: str,
354 serializer: type[JSONSerializer] = JSONSerializer,
355 max_age: int | float | datetime.timedelta | None = None,
356 ) -> Any:
357 """Unsign and decode an object, optionally checking max_age."""
358 base64d = self.unsign(signed_obj, max_age=max_age).encode()
359 decompress = base64d[:1] == b"."
360 if decompress:
361 base64d = base64d[1:]
362 data = b64_decode(base64d)
363 if decompress:
364 data = zlib.decompress(data)
365 return serializer().loads(data)