1from __future__ import annotations
2
3import time
4import zlib
5from typing import Any
6
7from plain.signing import (
8 JSONSerializer,
9 SignatureExpired,
10 Signer,
11 b62_decode,
12 b62_encode,
13 b64_decode,
14 b64_encode,
15)
16
17
18class ExpiringSigner(Signer):
19 """A signer with an embedded expiration (vs max age unsign)"""
20
21 def sign(self, value: str, expires_in: int) -> str:
22 timestamp = b62_encode(int(time.time() + expires_in))
23 value = f"{value}{self.sep}{timestamp}"
24 return super().sign(value)
25
26 def unsign(self, value: str) -> str:
27 """
28 Retrieve original value and check the expiration hasn't passed.
29 """
30 result = super().unsign(value)
31 value, timestamp = result.rsplit(self.sep, 1)
32 timestamp = b62_decode(timestamp)
33 if timestamp < time.time():
34 raise SignatureExpired("Signature expired")
35 return value
36
37 def sign_object(
38 self,
39 obj: Any,
40 serializer: type = JSONSerializer,
41 compress: bool = False,
42 expires_in: int | None = None,
43 ) -> str:
44 """
45 Return URL-safe, hmac signed base64 compressed JSON string.
46
47 If compress is True (not the default), check if compressing using zlib
48 can save some space. Prepend a '.' to signify compression. This is
49 included in the signature, to protect against zip bombs.
50
51 The serializer is expected to return a bytestring.
52 """
53 data = serializer().dumps(obj)
54 # Flag for if it's been compressed or not.
55 is_compressed = False
56
57 if compress:
58 # Avoid zlib dependency unless compress is being used.
59 compressed = zlib.compress(data)
60 if len(compressed) < (len(data) - 1):
61 data = compressed
62 is_compressed = True
63 base64d = b64_encode(data).decode()
64 if is_compressed:
65 base64d = "." + base64d
66 return self.sign(base64d, expires_in)
67
68 def unsign_object(self, signed_obj: str, serializer: type = JSONSerializer) -> Any:
69 # Signer.unsign() returns str but base64 and zlib compression operate
70 # on bytes.
71 base64d = self.unsign(signed_obj).encode()
72 decompress = base64d[:1] == b"."
73 if decompress:
74 # It's compressed; uncompress it first.
75 base64d = base64d[1:]
76 data = b64_decode(base64d)
77 if decompress:
78 data = zlib.decompress(data)
79 return serializer().loads(data)
80
81
82def dumps(
83 obj: Any,
84 key: str | None = None,
85 salt: str = "plain.loginlink",
86 serializer: type = JSONSerializer,
87 compress: bool = False,
88 expires_in: int | None = None,
89) -> str:
90 """
91 Return URL-safe, hmac signed base64 compressed JSON string. If key is
92 None, use settings.SECRET_KEY instead. The hmac algorithm is the default
93 Signer algorithm.
94
95 If compress is True (not the default), check if compressing using zlib can
96 save some space. Prepend a '.' to signify compression. This is included
97 in the signature, to protect against zip bombs.
98
99 Salt can be used to namespace the hash, so that a signed string is
100 only valid for a given namespace. Leaving this at the default
101 value or re-using a salt value across different parts of your
102 application without good cause is a security risk.
103
104 The serializer is expected to return a bytestring.
105 """
106 return ExpiringSigner(key=key, salt=salt).sign_object(
107 obj, serializer=serializer, compress=compress, expires_in=expires_in
108 )
109
110
111def loads(
112 s: str,
113 key: str | None = None,
114 salt: str = "plain.loginlink",
115 serializer: type = JSONSerializer,
116 fallback_keys: list[str] | None = None,
117) -> Any:
118 """
119 Reverse of dumps(), raise BadSignature if signature fails.
120
121 The serializer is expected to accept a bytestring.
122 """
123 return ExpiringSigner(
124 key=key, salt=salt, fallback_keys=fallback_keys
125 ).unsign_object(
126 s,
127 serializer=serializer,
128 )