1from __future__ import annotations
2
3import hmac
4from collections.abc import Generator
5from typing import TYPE_CHECKING
6
7from plain.exceptions import ImproperlyConfigured
8from plain.postgres import models_registry
9from plain.runtime import settings
10from plain.sessions import get_request_session
11from plain.utils.crypto import salted_hmac
12from plain.utils.encoding import force_bytes
13
14from .requests import get_request_user, set_request_user
15
16if TYPE_CHECKING:
17 from plain.http import Request
18 from plain.postgres import Model
19
20_USER_ID_SESSION_KEY = "_auth_user_id"
21_USER_HASH_SESSION_KEY = "_auth_user_hash"
22
23
24def get_session_auth_hash(user: Model) -> str:
25 """
26 Return an HMAC of the password field.
27 """
28 return _get_session_auth_hash(user)
29
30
31def update_session_auth_hash(request: Request, user: Model) -> None:
32 """
33 Updating a user's password (for example) logs out all sessions for the user.
34
35 Take the current request and the updated user object from which the new
36 session hash will be derived and update the session hash appropriately to
37 prevent a password change from logging out the session from which the
38 password was changed.
39 """
40
41 session = get_request_session(request)
42 session.cycle_key()
43 if get_request_user(request) == user:
44 session[_USER_HASH_SESSION_KEY] = get_session_auth_hash(user)
45
46
47def _get_session_auth_fallback_hash(user: Model) -> Generator[str]:
48 for fallback_secret in settings.SECRET_KEY_FALLBACKS:
49 yield _get_session_auth_hash(user, secret=fallback_secret)
50
51
52def _get_session_auth_hash(user: Model, secret: str | None = None) -> str:
53 key_salt = "plain.auth.get_session_auth_hash"
54 return salted_hmac(
55 key_salt,
56 getattr(user, settings.AUTH_USER_SESSION_HASH_FIELD),
57 secret=secret,
58 algorithm="sha256",
59 ).hexdigest()
60
61
62def login(request: Request, user: Model) -> None:
63 """
64 Persist a user id and a backend in the request. This way a user doesn't
65 have to reauthenticate on every request. Note that data set during
66 the anonymous session is retained when the user logs in.
67 """
68 session = get_request_session(request)
69
70 if settings.AUTH_USER_SESSION_HASH_FIELD:
71 session_auth_hash = get_session_auth_hash(user)
72 else:
73 session_auth_hash = ""
74
75 if _USER_ID_SESSION_KEY in session:
76 if int(session[_USER_ID_SESSION_KEY]) != user.id:
77 # To avoid reusing another user's session, create a new, empty
78 # session if the existing session corresponds to a different
79 # authenticated user.
80 session.flush()
81 elif session_auth_hash and not hmac.compare_digest(
82 force_bytes(session.get(_USER_HASH_SESSION_KEY, "")),
83 force_bytes(session_auth_hash),
84 ):
85 # If the session hash does not match the current hash, reset the
86 # session. Most likely this means the password was changed.
87 session.flush()
88 else:
89 # Invalidate the current session key and generate a new one to enhance security,
90 # typically done after user login to prevent session fixation attacks.
91 session.cycle_key()
92
93 session[_USER_ID_SESSION_KEY] = user.id
94 session[_USER_HASH_SESSION_KEY] = session_auth_hash
95 set_request_user(request, user)
96
97
98def logout(request: Request) -> None:
99 """
100 Remove the authenticated user's ID from the request and flush their session
101 data.
102 """
103 # Dispatch the signal before the user is logged out so the receivers have a
104 # chance to find out *who* logged out.
105 session = get_request_session(request)
106 session.flush()
107 set_request_user(request, None)
108
109
110def get_user_model() -> type[Model]:
111 """
112 Return the User model that is active in this project.
113 """
114 try:
115 return models_registry.get_model(settings.AUTH_USER_MODEL, require_ready=False)
116 except ValueError:
117 raise ImproperlyConfigured(
118 "AUTH_USER_MODEL must be of the form 'package_label.model_name'"
119 )
120 except LookupError:
121 raise ImproperlyConfigured(
122 f"AUTH_USER_MODEL refers to model '{settings.AUTH_USER_MODEL}' that has not been installed"
123 )
124
125
126def get_user(request: Request) -> Model | None:
127 """
128 Return the user model instance associated with the given request session.
129 If no user is retrieved, return None.
130 """
131 session = get_request_session(request)
132
133 if _USER_ID_SESSION_KEY not in session:
134 return None
135
136 UserModel = get_user_model()
137 try:
138 user = UserModel.query.get(id=session[_USER_ID_SESSION_KEY])
139 except UserModel.DoesNotExist:
140 return None
141
142 # If the user models defines a specific field to also hash and compare
143 # (like password), then we verify that the hash of that field is still
144 # the same as when the session was created.
145 #
146 # If it has changed (i.e. password changed), then the session
147 # is no longer valid and cleared out.
148 if settings.AUTH_USER_SESSION_HASH_FIELD:
149 session_hash = session.get(_USER_HASH_SESSION_KEY)
150 if not session_hash:
151 session_hash_verified = False
152 else:
153 session_auth_hash = get_session_auth_hash(user)
154 session_hash_verified = hmac.compare_digest(
155 force_bytes(session_hash), force_bytes(session_auth_hash)
156 )
157 if not session_hash_verified:
158 # If the current secret does not verify the session, try
159 # with the fallback secrets and stop when a matching one is
160 # found.
161 if session_hash and any(
162 hmac.compare_digest(
163 force_bytes(session_hash), force_bytes(fallback_auth_hash)
164 )
165 for fallback_auth_hash in _get_session_auth_fallback_hash(user)
166 ):
167 session.cycle_key()
168 session[_USER_HASH_SESSION_KEY] = session_auth_hash
169 else:
170 session.flush()
171 user = None
172
173 return user