1from plain.csrf.middleware import rotate_token
2from plain.exceptions import ImproperlyConfigured
3from plain.models import models_registry
4from plain.runtime import settings
5from plain.utils.crypto import constant_time_compare, salted_hmac
6
7USER_ID_SESSION_KEY = "_auth_user_id"
8USER_HASH_SESSION_KEY = "_auth_user_hash"
9
10
11def _get_user_id_from_session(request):
12 # This value in the session is always serialized to a string, so we need
13 # to convert it back to Python whenever we access it.
14 return get_user_model()._meta.pk.to_python(request.session[USER_ID_SESSION_KEY])
15
16
17def get_session_auth_hash(user):
18 """
19 Return an HMAC of the password field.
20 """
21 return _get_session_auth_hash(user)
22
23
24def update_session_auth_hash(request, user):
25 """
26 Updating a user's password (for example) logs out all sessions for the user.
27
28 Take the current request and the updated user object from which the new
29 session hash will be derived and update the session hash appropriately to
30 prevent a password change from logging out the session from which the
31 password was changed.
32 """
33 request.session.cycle_key()
34 if request.user == user:
35 request.session[USER_HASH_SESSION_KEY] = get_session_auth_hash(user)
36
37
38def get_session_auth_fallback_hash(user):
39 for fallback_secret in settings.SECRET_KEY_FALLBACKS:
40 yield _get_session_auth_hash(user, secret=fallback_secret)
41
42
43def _get_session_auth_hash(user, secret=None):
44 key_salt = "plain.auth.get_session_auth_hash"
45 return salted_hmac(
46 key_salt,
47 getattr(user, settings.AUTH_USER_SESSION_HASH_FIELD),
48 secret=secret,
49 algorithm="sha256",
50 ).hexdigest()
51
52
53def login(request, user):
54 """
55 Persist a user id and a backend in the request. This way a user doesn't
56 have to reauthenticate on every request. Note that data set during
57 the anonymous session is retained when the user logs in.
58 """
59 if settings.AUTH_USER_SESSION_HASH_FIELD:
60 session_auth_hash = get_session_auth_hash(user)
61 else:
62 session_auth_hash = ""
63
64 if USER_ID_SESSION_KEY in request.session:
65 if _get_user_id_from_session(request) != user.pk:
66 # To avoid reusing another user's session, create a new, empty
67 # session if the existing session corresponds to a different
68 # authenticated user.
69 request.session.flush()
70 elif session_auth_hash and not constant_time_compare(
71 request.session.get(USER_HASH_SESSION_KEY, ""), session_auth_hash
72 ):
73 # If the session hash does not match the current hash, reset the
74 # session. Most likely this means the password was changed.
75 request.session.flush()
76 else:
77 # Invalidate the current session key and generate a new one to enhance security,
78 # typically done after user login to prevent session fixation attacks.
79 request.session.cycle_key()
80
81 request.session[USER_ID_SESSION_KEY] = user._meta.pk.value_to_string(user)
82 request.session[USER_HASH_SESSION_KEY] = session_auth_hash
83 if hasattr(request, "user"):
84 request.user = user
85 rotate_token(request)
86
87
88def logout(request):
89 """
90 Remove the authenticated user's ID from the request and flush their session
91 data.
92 """
93 # Dispatch the signal before the user is logged out so the receivers have a
94 # chance to find out *who* logged out.
95 request.session.flush()
96 if hasattr(request, "user"):
97 request.user = None
98
99
100def get_user_model():
101 """
102 Return the User model that is active in this project.
103 """
104 try:
105 return models_registry.get_model(settings.AUTH_USER_MODEL, require_ready=False)
106 except ValueError:
107 raise ImproperlyConfigured(
108 "AUTH_USER_MODEL must be of the form 'package_label.model_name'"
109 )
110 except LookupError:
111 raise ImproperlyConfigured(
112 f"AUTH_USER_MODEL refers to model '{settings.AUTH_USER_MODEL}' that has not been installed"
113 )
114
115
116def get_user(request):
117 """
118 Return the user model instance associated with the given request session.
119 If no user is retrieved, return None.
120 """
121 if USER_ID_SESSION_KEY not in request.session:
122 return None
123
124 user_id = _get_user_id_from_session(request)
125
126 UserModel = get_user_model()
127 try:
128 user = UserModel._default_manager.get(pk=user_id)
129 except UserModel.DoesNotExist:
130 return None
131
132 # If the user models defines a specific field to also hash and compare
133 # (like password), then we verify that the hash of that field is still
134 # the same as when the session was created.
135 #
136 # If it has changed (i.e. password changed), then the session
137 # is no longer valid and cleared out.
138 if settings.AUTH_USER_SESSION_HASH_FIELD:
139 session_hash = request.session.get(USER_HASH_SESSION_KEY)
140 if not session_hash:
141 session_hash_verified = False
142 else:
143 session_auth_hash = get_session_auth_hash(user)
144 session_hash_verified = constant_time_compare(
145 session_hash, session_auth_hash
146 )
147 if not session_hash_verified:
148 # If the current secret does not verify the session, try
149 # with the fallback secrets and stop when a matching one is
150 # found.
151 if session_hash and any(
152 constant_time_compare(session_hash, fallback_auth_hash)
153 for fallback_auth_hash in get_session_auth_fallback_hash(user)
154 ):
155 request.session.cycle_key()
156 request.session[USER_HASH_SESSION_KEY] = session_auth_hash
157 else:
158 request.session.flush()
159 user = None
160
161 return user