1from __future__ import annotations
2
3import hmac
4from datetime import datetime
5from typing import TYPE_CHECKING, Any
6
7from plain import signing
8from plain.auth import get_user_model
9from plain.auth.sessions import login as auth_login
10from plain.auth.sessions import update_session_auth_hash
11from plain.auth.views import AuthViewMixin
12from plain.exceptions import BadRequest
13from plain.http import (
14 ResponseRedirect,
15)
16from plain.urls import reverse
17from plain.utils.cache import add_never_cache_headers
18from plain.utils.encoding import force_bytes
19from plain.views import CreateView, FormView
20
21from .forms import (
22 PasswordChangeForm,
23 PasswordLoginForm,
24 PasswordResetForm,
25 PasswordSetForm,
26 PasswordSignupForm,
27)
28
29if TYPE_CHECKING:
30 from plain.http import Response
31 from plain.models import Model
32
33
34class PasswordForgotView(FormView):
35 form_class = PasswordResetForm
36 reset_confirm_url_name: str
37
38 def generate_password_reset_token(self, user: Any) -> str:
39 return signing.dumps(
40 {
41 "id": user.id,
42 "email": user.email,
43 "password": user.password, # Hashed password
44 "timestamp": datetime.now().timestamp(), # Makes each token unique
45 },
46 salt="password-reset",
47 compress=True,
48 )
49
50 def generate_password_reset_url(self, user: Any) -> str:
51 token = self.generate_password_reset_token(user)
52 url = reverse(self.reset_confirm_url_name) + f"?token={token}"
53 return self.request.build_absolute_uri(url)
54
55 def form_valid(self, form: PasswordResetForm) -> Response:
56 form.save(
57 generate_reset_url=self.generate_password_reset_url,
58 )
59 return super().form_valid(form)
60
61
62class PasswordResetView(AuthViewMixin, FormView):
63 form_class = PasswordSetForm
64 reset_token_max_age = 60 * 60 # 1 hour
65 _reset_token_session_key = "_password_reset_token"
66
67 def check_password_reset_token(self, token: str) -> Model | None:
68 max_age = self.reset_token_max_age
69
70 try:
71 data = signing.loads(token, salt="password-reset", max_age=max_age)
72 except signing.SignatureExpired:
73 return None
74 except signing.BadSignature:
75 return None
76
77 UserModel = get_user_model()
78 try:
79 user = UserModel.query.get(id=data["id"])
80 except (TypeError, ValueError, OverflowError, UserModel.DoesNotExist):
81 return None
82
83 # If the password has changed since the token was generated, the token is invalid.
84 # (These are the hashed passwords, not the raw passwords.)
85 if not hmac.compare_digest(
86 force_bytes(user.password), force_bytes(data["password"])
87 ):
88 return None
89
90 # If the email has changed since the token was generated, the token is invalid.
91 if not hmac.compare_digest(force_bytes(user.email), force_bytes(data["email"])):
92 return None
93
94 return user
95
96 def get(self) -> Response:
97 if self.user:
98 # Redirect if the user is already logged in
99 return ResponseRedirect(self.success_url)
100
101 # Tokens are initially passed as GET parameters and we
102 # immediately store them in the session and remove it from the URL.
103 if token := self.request.query_params.get("token", ""):
104 # Store the token in the session and redirect to the
105 # password reset form at a URL without the token. That
106 # avoids the possibility of leaking the token in the
107 # HTTP Referer header.
108 self.session[self._reset_token_session_key] = token
109 # Redirect to the path itself, without the GET parameters
110 response = ResponseRedirect(self.request.path)
111 add_never_cache_headers(response)
112 return response
113
114 return super().get()
115
116 def get_user(self) -> Model:
117 session_token = self.session.get(self._reset_token_session_key, "")
118 if not session_token:
119 # No token in the session, so we can't check the password reset token.
120 raise BadRequest("No password reset token found.")
121
122 user = self.check_password_reset_token(session_token)
123 if not user:
124 # Remove it from the session if it is invalid.
125 del self.session[self._reset_token_session_key]
126 raise BadRequest("Password reset token is no longer valid.")
127
128 return user
129
130 def get_form_kwargs(self) -> dict:
131 kwargs = super().get_form_kwargs()
132 kwargs["user"] = self.get_user()
133 return kwargs
134
135 def form_valid(self, form: PasswordSetForm) -> Response:
136 form.save()
137 del self.session[self._reset_token_session_key]
138 # If you wanted, you could log in the user here so they don't have to
139 # go through the log in form again.
140 return super().form_valid(form)
141
142
143class PasswordChangeView(AuthViewMixin, FormView):
144 # Change to PasswordSetForm if you want to set new passwords
145 # without confirming the old one.
146 form_class = PasswordChangeForm
147
148 def get_form_kwargs(self) -> dict:
149 kwargs = super().get_form_kwargs()
150 kwargs["user"] = self.user
151 return kwargs
152
153 def form_valid(self, form: PasswordChangeForm) -> Response:
154 form.save()
155 # Updating the password logs out all other sessions for the user
156 # except the current one.
157 update_session_auth_hash(self.request, form.user)
158 return super().form_valid(form)
159
160
161class PasswordLoginView(AuthViewMixin, FormView):
162 form_class = PasswordLoginForm
163 success_url = "/"
164
165 def get(self) -> Response:
166 # Redirect if the user is already logged in
167 if self.user:
168 return ResponseRedirect(self.success_url)
169
170 return super().get()
171
172 def form_valid(self, form: PasswordLoginForm) -> Response:
173 # Log the user in and redirect
174 auth_login(self.request, form.get_user())
175
176 return super().form_valid(form)
177
178
179class PasswordSignupView(CreateView):
180 form_class = PasswordSignupForm
181 success_url = "/"
182
183 def form_valid(self, form: PasswordSignupForm) -> Response:
184 # # Log the user in and redirect
185 # auth_login(self.request, form.save())
186
187 return super().form_valid(form)