1from __future__ import annotations
2
3import hmac
4from collections.abc import Generator
5from typing import TYPE_CHECKING, Any
6
7from plain.exceptions import ImproperlyConfigured
8from plain.models import models_registry
9from plain.runtime import settings
10from plain.sessions import get_request_session
11from plain.utils.crypto import salted_hmac
12from plain.utils.encoding import force_bytes
13
14from .requests import get_request_user, set_request_user
15
16if TYPE_CHECKING:
17 from plain.http import Request
18
19USER_ID_SESSION_KEY = "_auth_user_id"
20USER_HASH_SESSION_KEY = "_auth_user_hash"
21
22
23def get_session_auth_hash(user: Any) -> str:
24 """
25 Return an HMAC of the password field.
26 """
27 return _get_session_auth_hash(user)
28
29
30def update_session_auth_hash(request: Request, user: Any) -> None:
31 """
32 Updating a user's password (for example) logs out all sessions for the user.
33
34 Take the current request and the updated user object from which the new
35 session hash will be derived and update the session hash appropriately to
36 prevent a password change from logging out the session from which the
37 password was changed.
38 """
39
40 session = get_request_session(request)
41 session.cycle_key()
42 if get_request_user(request) == user:
43 session[USER_HASH_SESSION_KEY] = get_session_auth_hash(user)
44
45
46def get_session_auth_fallback_hash(user: Any) -> Generator[str, None, None]:
47 for fallback_secret in settings.SECRET_KEY_FALLBACKS:
48 yield _get_session_auth_hash(user, secret=fallback_secret)
49
50
51def _get_session_auth_hash(user: Any, secret: str | None = None) -> str:
52 key_salt = "plain.auth.get_session_auth_hash"
53 return salted_hmac(
54 key_salt,
55 getattr(user, settings.AUTH_USER_SESSION_HASH_FIELD),
56 secret=secret,
57 algorithm="sha256",
58 ).hexdigest()
59
60
61def login(request: Request, user: Any) -> None:
62 """
63 Persist a user id and a backend in the request. This way a user doesn't
64 have to reauthenticate on every request. Note that data set during
65 the anonymous session is retained when the user logs in.
66 """
67 session = get_request_session(request)
68
69 if settings.AUTH_USER_SESSION_HASH_FIELD:
70 session_auth_hash = get_session_auth_hash(user)
71 else:
72 session_auth_hash = ""
73
74 if USER_ID_SESSION_KEY in session:
75 if int(session[USER_ID_SESSION_KEY]) != user.id:
76 # To avoid reusing another user's session, create a new, empty
77 # session if the existing session corresponds to a different
78 # authenticated user.
79 session.flush()
80 elif session_auth_hash and not hmac.compare_digest(
81 force_bytes(session.get(USER_HASH_SESSION_KEY, "")),
82 force_bytes(session_auth_hash),
83 ):
84 # If the session hash does not match the current hash, reset the
85 # session. Most likely this means the password was changed.
86 session.flush()
87 else:
88 # Invalidate the current session key and generate a new one to enhance security,
89 # typically done after user login to prevent session fixation attacks.
90 session.cycle_key()
91
92 session[USER_ID_SESSION_KEY] = user.id
93 session[USER_HASH_SESSION_KEY] = session_auth_hash
94 set_request_user(request, user)
95
96
97def logout(request: Request) -> None:
98 """
99 Remove the authenticated user's ID from the request and flush their session
100 data.
101 """
102 # Dispatch the signal before the user is logged out so the receivers have a
103 # chance to find out *who* logged out.
104 session = get_request_session(request)
105 session.flush()
106 set_request_user(request, None)
107
108
109def get_user_model() -> type[Any]:
110 """
111 Return the User model that is active in this project.
112 """
113 try:
114 return models_registry.get_model(settings.AUTH_USER_MODEL, require_ready=False)
115 except ValueError:
116 raise ImproperlyConfigured(
117 "AUTH_USER_MODEL must be of the form 'package_label.model_name'"
118 )
119 except LookupError:
120 raise ImproperlyConfigured(
121 f"AUTH_USER_MODEL refers to model '{settings.AUTH_USER_MODEL}' that has not been installed"
122 )
123
124
125def get_user(request: Request) -> Any | None:
126 """
127 Return the user model instance associated with the given request session.
128 If no user is retrieved, return None.
129 """
130 session = get_request_session(request)
131
132 if USER_ID_SESSION_KEY not in session:
133 return None
134
135 UserModel = get_user_model()
136 try:
137 user = UserModel.query.get(id=session[USER_ID_SESSION_KEY])
138 except UserModel.DoesNotExist:
139 return None
140
141 # If the user models defines a specific field to also hash and compare
142 # (like password), then we verify that the hash of that field is still
143 # the same as when the session was created.
144 #
145 # If it has changed (i.e. password changed), then the session
146 # is no longer valid and cleared out.
147 if settings.AUTH_USER_SESSION_HASH_FIELD:
148 session_hash = session.get(USER_HASH_SESSION_KEY)
149 if not session_hash:
150 session_hash_verified = False
151 else:
152 session_auth_hash = get_session_auth_hash(user)
153 session_hash_verified = hmac.compare_digest(
154 force_bytes(session_hash), force_bytes(session_auth_hash)
155 )
156 if not session_hash_verified:
157 # If the current secret does not verify the session, try
158 # with the fallback secrets and stop when a matching one is
159 # found.
160 if session_hash and any(
161 hmac.compare_digest(
162 force_bytes(session_hash), force_bytes(fallback_auth_hash)
163 )
164 for fallback_auth_hash in get_session_auth_fallback_hash(user)
165 ):
166 session.cycle_key()
167 session[USER_HASH_SESSION_KEY] = session_auth_hash
168 else:
169 session.flush()
170 user = None
171
172 return user