1from __future__ import annotations
2
3import hmac
4from datetime import datetime
5from typing import TYPE_CHECKING, Any
6
7from plain import signing
8from plain.auth import get_user_model
9from plain.auth.sessions import login as auth_login
10from plain.auth.sessions import update_session_auth_hash
11from plain.auth.views import AuthView
12from plain.exceptions import BadRequest
13from plain.forms import BaseForm
14from plain.http import (
15 ResponseRedirect,
16)
17from plain.urls import reverse
18from plain.utils.cache import add_never_cache_headers
19from plain.utils.encoding import force_bytes
20from plain.views import CreateView, FormView
21
22from .forms import (
23 PasswordChangeForm,
24 PasswordLoginForm,
25 PasswordResetForm,
26 PasswordSetForm,
27 PasswordSignupForm,
28)
29
30if TYPE_CHECKING:
31 from plain.http import Response
32 from plain.models import Model
33
34
35class PasswordForgotView(FormView):
36 form_class = PasswordResetForm
37 reset_confirm_url_name: str
38
39 def generate_password_reset_token(self, user: Any) -> str:
40 return signing.dumps(
41 {
42 "id": user.id,
43 "email": user.email,
44 "password": user.password, # Hashed password
45 "timestamp": datetime.now().timestamp(), # Makes each token unique
46 },
47 salt="password-reset",
48 compress=True,
49 )
50
51 def generate_password_reset_url(self, user: Any) -> str:
52 token = self.generate_password_reset_token(user)
53 url = reverse(self.reset_confirm_url_name) + f"?token={token}"
54 return self.request.build_absolute_uri(url)
55
56 def form_valid(self, form: PasswordResetForm) -> Response: # type: ignore[override]
57 form.save(
58 generate_reset_url=self.generate_password_reset_url,
59 )
60 return super().form_valid(form)
61
62
63class PasswordResetView(AuthView, FormView):
64 form_class = PasswordSetForm
65 reset_token_max_age = 60 * 60 # 1 hour
66 _reset_token_session_key = "_password_reset_token"
67
68 def check_password_reset_token(self, token: str) -> Model | None:
69 max_age = self.reset_token_max_age
70
71 try:
72 data = signing.loads(token, salt="password-reset", max_age=max_age)
73 except signing.SignatureExpired:
74 return None
75 except signing.BadSignature:
76 return None
77
78 UserModel = get_user_model()
79 try:
80 user = UserModel.query.get(id=data["id"])
81 except (TypeError, ValueError, OverflowError, UserModel.DoesNotExist):
82 return None
83
84 # If the password has changed since the token was generated, the token is invalid.
85 # (These are the hashed passwords, not the raw passwords.)
86 if not hmac.compare_digest(
87 force_bytes(user.password), force_bytes(data["password"])
88 ):
89 return None
90
91 # If the email has changed since the token was generated, the token is invalid.
92 if not hmac.compare_digest(force_bytes(user.email), force_bytes(data["email"])):
93 return None
94
95 return user
96
97 def get(self) -> Response:
98 if self.user:
99 # Redirect if the user is already logged in
100 return ResponseRedirect(str(self.success_url) if self.success_url else "/")
101
102 # Tokens are initially passed as GET parameters and we
103 # immediately store them in the session and remove it from the URL.
104 if token := self.request.query_params.get("token", ""):
105 # Store the token in the session and redirect to the
106 # password reset form at a URL without the token. That
107 # avoids the possibility of leaking the token in the
108 # HTTP Referer header.
109 self.session[self._reset_token_session_key] = token
110 # Redirect to the path itself, without the GET parameters
111 response = ResponseRedirect(self.request.path)
112 add_never_cache_headers(response)
113 return response
114
115 return super().get()
116
117 def get_user(self) -> Model:
118 session_token = self.session.get(self._reset_token_session_key, "")
119 if not session_token:
120 # No token in the session, so we can't check the password reset token.
121 raise BadRequest("No password reset token found.")
122
123 user = self.check_password_reset_token(session_token)
124 if not user:
125 # Remove it from the session if it is invalid.
126 del self.session[self._reset_token_session_key]
127 raise BadRequest("Password reset token is no longer valid.")
128
129 return user
130
131 def get_form_kwargs(self) -> dict:
132 kwargs = super().get_form_kwargs()
133 kwargs["user"] = self.get_user()
134 return kwargs
135
136 def form_valid(self, form: PasswordSetForm) -> Response: # type: ignore[override]
137 form.save()
138 del self.session[self._reset_token_session_key]
139 # If you wanted, you could log in the user here so they don't have to
140 # go through the log in form again.
141 return super().form_valid(form)
142
143
144class PasswordChangeView(AuthView, FormView):
145 # Change to PasswordSetForm if you want to set new passwords
146 # without confirming the old one.
147 form_class = PasswordChangeForm
148
149 def get_form_kwargs(self) -> dict:
150 kwargs = super().get_form_kwargs()
151 kwargs["user"] = self.user
152 return kwargs
153
154 def form_valid(self, form: PasswordChangeForm) -> Response: # type: ignore[override]
155 form.save()
156 # Updating the password logs out all other sessions for the user
157 # except the current one.
158 update_session_auth_hash(self.request, form.user)
159 return super().form_valid(form)
160
161
162class PasswordLoginView(AuthView, FormView):
163 form_class = PasswordLoginForm
164 success_url = "/"
165
166 def get(self) -> Response:
167 # Redirect if the user is already logged in
168 if self.user:
169 return ResponseRedirect(self.success_url)
170
171 return super().get()
172
173 def form_valid(self, form: PasswordLoginForm) -> Response: # type: ignore[override]
174 # Log the user in and redirect
175 auth_login(self.request, form.get_user())
176
177 return super().form_valid(form)
178
179
180class PasswordSignupView(CreateView):
181 form_class = PasswordSignupForm
182 success_url = "/"
183
184 def form_valid(self, form: BaseForm) -> Response:
185 # # Log the user in and redirect
186 # auth_login(self.request, form.save())
187
188 return super().form_valid(form)