1import logging
2import string
3from datetime import datetime, timedelta
4
5from plain import signing
6from plain.runtime import settings
7from plain.utils import timezone
8from plain.utils.crypto import get_random_string
9
10# session_key should not be case sensitive because some backends can store it
11# on case insensitive file systems.
12VALID_KEY_CHARS = string.ascii_lowercase + string.digits
13
14
15class CreateError(Exception):
16 """
17 Used internally as a consistent exception type to catch from save (see the
18 docstring for SessionBase.save() for details).
19 """
20
21 pass
22
23
24class UpdateError(Exception):
25 """
26 Occurs if Plain tries to update a session that was deleted.
27 """
28
29 pass
30
31
32class SessionBase:
33 """
34 Base class for all Session classes.
35 """
36
37 __not_given = object()
38
39 def __init__(self, session_key=None):
40 self._session_key = session_key
41 self.accessed = False
42 self.modified = False
43
44 def __contains__(self, key):
45 return key in self._session
46
47 def __getitem__(self, key):
48 return self._session[key]
49
50 def __setitem__(self, key, value):
51 self._session[key] = value
52 self.modified = True
53
54 def __delitem__(self, key):
55 del self._session[key]
56 self.modified = True
57
58 @property
59 def key_salt(self):
60 return "plain.sessions." + self.__class__.__qualname__
61
62 def get(self, key, default=None):
63 return self._session.get(key, default)
64
65 def pop(self, key, default=__not_given):
66 self.modified = self.modified or key in self._session
67 args = () if default is self.__not_given else (default,)
68 return self._session.pop(key, *args)
69
70 def setdefault(self, key, value):
71 if key in self._session:
72 return self._session[key]
73 else:
74 self.modified = True
75 self._session[key] = value
76 return value
77
78 def encode(self, session_dict):
79 "Return the given session dictionary serialized and encoded as a string."
80 return signing.dumps(
81 session_dict,
82 salt=self.key_salt,
83 compress=True,
84 )
85
86 def decode(self, session_data):
87 try:
88 return signing.loads(session_data, salt=self.key_salt)
89 except signing.BadSignature:
90 logger = logging.getLogger("plain.security.SuspiciousSession")
91 logger.warning("Session data corrupted")
92 except Exception:
93 # ValueError, unpickling exceptions. If any of these happen, just
94 # return an empty dictionary (an empty session).
95 pass
96 return {}
97
98 def update(self, dict_):
99 self._session.update(dict_)
100 self.modified = True
101
102 def has_key(self, key):
103 return key in self._session
104
105 def keys(self):
106 return self._session.keys()
107
108 def values(self):
109 return self._session.values()
110
111 def items(self):
112 return self._session.items()
113
114 def clear(self):
115 # To avoid unnecessary persistent storage accesses, we set up the
116 # internals directly (loading data wastes time, since we are going to
117 # set it to an empty dict anyway).
118 self._session_cache = {}
119 self.accessed = True
120 self.modified = True
121
122 def is_empty(self):
123 "Return True when there is no session_key and the session is empty."
124 try:
125 return not self._session_key and not self._session_cache
126 except AttributeError:
127 return True
128
129 def _get_new_session_key(self):
130 "Return session key that isn't being used."
131 while True:
132 session_key = get_random_string(32, VALID_KEY_CHARS)
133 if not self.exists(session_key):
134 return session_key
135
136 def _get_or_create_session_key(self):
137 if self._session_key is None:
138 self._session_key = self._get_new_session_key()
139 return self._session_key
140
141 def _validate_session_key(self, key):
142 """
143 Key must be truthy and at least 8 characters long. 8 characters is an
144 arbitrary lower bound for some minimal key security.
145 """
146 return key and len(key) >= 8
147
148 def _get_session_key(self):
149 return self.__session_key
150
151 def _set_session_key(self, value):
152 """
153 Validate session key on assignment. Invalid values will set to None.
154 """
155 if self._validate_session_key(value):
156 self.__session_key = value
157 else:
158 self.__session_key = None
159
160 session_key = property(_get_session_key)
161 _session_key = property(_get_session_key, _set_session_key)
162
163 def _get_session(self, no_load=False):
164 """
165 Lazily load session from storage (unless "no_load" is True, when only
166 an empty dict is stored) and store it in the current instance.
167 """
168 self.accessed = True
169 try:
170 return self._session_cache
171 except AttributeError:
172 if self.session_key is None or no_load:
173 self._session_cache = {}
174 else:
175 self._session_cache = self.load()
176 return self._session_cache
177
178 _session = property(_get_session)
179
180 def get_session_cookie_age(self):
181 return settings.SESSION_COOKIE_AGE
182
183 def get_expiry_age(self, **kwargs):
184 """Get the number of seconds until the session expires.
185
186 Optionally, this function accepts `modification` and `expiry` keyword
187 arguments specifying the modification and expiry of the session.
188 """
189 try:
190 modification = kwargs["modification"]
191 except KeyError:
192 modification = timezone.now()
193 # Make the difference between "expiry=None passed in kwargs" and
194 # "expiry not passed in kwargs", in order to guarantee not to trigger
195 # self.load() when expiry is provided.
196 try:
197 expiry = kwargs["expiry"]
198 except KeyError:
199 expiry = self.get("_session_expiry")
200
201 if not expiry: # Checks both None and 0 cases
202 return self.get_session_cookie_age()
203 if not isinstance(expiry, datetime | str):
204 return expiry
205 if isinstance(expiry, str):
206 expiry = datetime.fromisoformat(expiry)
207 delta = expiry - modification
208 return delta.days * 86400 + delta.seconds
209
210 def get_expiry_date(self, **kwargs):
211 """Get session the expiry date (as a datetime object).
212
213 Optionally, this function accepts `modification` and `expiry` keyword
214 arguments specifying the modification and expiry of the session.
215 """
216 try:
217 modification = kwargs["modification"]
218 except KeyError:
219 modification = timezone.now()
220 # Same comment as in get_expiry_age
221 try:
222 expiry = kwargs["expiry"]
223 except KeyError:
224 expiry = self.get("_session_expiry")
225
226 if isinstance(expiry, datetime):
227 return expiry
228 elif isinstance(expiry, str):
229 return datetime.fromisoformat(expiry)
230 expiry = expiry or self.get_session_cookie_age()
231 return modification + timedelta(seconds=expiry)
232
233 def set_expiry(self, value):
234 """
235 Set a custom expiration for the session. ``value`` can be an integer,
236 a Python ``datetime`` or ``timedelta`` object or ``None``.
237
238 If ``value`` is an integer, the session will expire after that many
239 seconds of inactivity. If set to ``0`` then the session will expire on
240 browser close.
241
242 If ``value`` is a ``datetime`` or ``timedelta`` object, the session
243 will expire at that specific future time.
244
245 If ``value`` is ``None``, the session uses the global session expiry
246 policy.
247 """
248 if value is None:
249 # Remove any custom expiration for this session.
250 try:
251 del self["_session_expiry"]
252 except KeyError:
253 pass
254 return
255 if isinstance(value, timedelta):
256 value = timezone.now() + value
257 if isinstance(value, datetime):
258 value = value.isoformat()
259 self["_session_expiry"] = value
260
261 def get_expire_at_browser_close(self):
262 """
263 Return ``True`` if the session is set to expire when the browser
264 closes, and ``False`` if there's an expiry date. Use
265 ``get_expiry_date()`` or ``get_expiry_age()`` to find the actual expiry
266 date/age, if there is one.
267 """
268 if (expiry := self.get("_session_expiry")) is None:
269 return settings.SESSION_EXPIRE_AT_BROWSER_CLOSE
270 return expiry == 0
271
272 def flush(self):
273 """
274 Remove the current session data from the database and regenerate the
275 key.
276 """
277 self.clear()
278 self.delete()
279 self._session_key = None
280
281 def cycle_key(self):
282 """
283 Create a new session key, while retaining the current session data.
284 """
285 data = self._session
286 key = self.session_key
287 self.create()
288 self._session_cache = data
289 if key:
290 self.delete(key)
291
292 # Methods that child classes must implement.
293
294 def exists(self, session_key):
295 """
296 Return True if the given session_key already exists.
297 """
298 raise NotImplementedError(
299 "subclasses of SessionBase must provide an exists() method"
300 )
301
302 def create(self):
303 """
304 Create a new session instance. Guaranteed to create a new object with
305 a unique key and will have saved the result once (with empty data)
306 before the method returns.
307 """
308 raise NotImplementedError(
309 "subclasses of SessionBase must provide a create() method"
310 )
311
312 def save(self, must_create=False):
313 """
314 Save the session data. If 'must_create' is True, create a new session
315 object (or raise CreateError). Otherwise, only update an existing
316 object and don't create one (raise UpdateError if needed).
317 """
318 raise NotImplementedError(
319 "subclasses of SessionBase must provide a save() method"
320 )
321
322 def delete(self, session_key=None):
323 """
324 Delete the session data under this key. If the key is None, use the
325 current session key value.
326 """
327 raise NotImplementedError(
328 "subclasses of SessionBase must provide a delete() method"
329 )
330
331 def load(self):
332 """
333 Load the session data and return a dictionary.
334 """
335 raise NotImplementedError(
336 "subclasses of SessionBase must provide a load() method"
337 )
338
339 @classmethod
340 def clear_expired(cls):
341 """
342 Remove expired sessions from the session store.
343
344 If this operation isn't possible on a given backend, it should raise
345 NotImplementedError. If it isn't necessary, because the backend has
346 a built-in expiration mechanism, it should be a no-op.
347 """
348 raise NotImplementedError("This backend does not support clear_expired().")