1from __future__ import annotations
  2
  3import hmac
  4from datetime import datetime
  5from typing import TYPE_CHECKING, Any
  6
  7from app.users.models import User
  8
  9from plain.auth.sessions import login as auth_login
 10from plain.auth.sessions import update_session_auth_hash
 11from plain.auth.views import AuthView
 12from plain.forms import BaseForm
 13from plain.http import (
 14    BadRequestError400,
 15    RedirectResponse,
 16)
 17from plain.signing import BadSignature, SignatureExpired, TimestampSigner
 18from plain.urls import reverse
 19from plain.utils.cache import add_never_cache_headers
 20from plain.utils.encoding import force_bytes
 21from plain.views import CreateView, FormView
 22
 23from .forms import (
 24    PasswordChangeForm,
 25    PasswordLoginForm,
 26    PasswordResetForm,
 27    PasswordSetForm,
 28    PasswordSignupForm,
 29)
 30
 31if TYPE_CHECKING:
 32    from plain.http import Response
 33
 34
 35class PasswordForgotView(FormView[PasswordResetForm]):
 36    form_class = PasswordResetForm
 37    reset_confirm_url_name: str
 38
 39    def generate_password_reset_token(self, user: Any) -> str:
 40        return TimestampSigner(salt="password-reset").sign_object(
 41            {
 42                "id": user.id,
 43                "email": user.email,
 44                "password": user.password,  # Hashed password
 45                "timestamp": datetime.now().timestamp(),  # Makes each token unique
 46            },
 47            compress=True,
 48        )
 49
 50    def generate_password_reset_url(self, user: Any) -> str:
 51        token = self.generate_password_reset_token(user)
 52        url = reverse(self.reset_confirm_url_name) + f"?token={token}"
 53        return self.request.build_absolute_uri(url)
 54
 55    def form_valid(self, form: PasswordResetForm) -> Response:
 56        form.save(
 57            generate_reset_url=self.generate_password_reset_url,
 58        )
 59        return super().form_valid(form)
 60
 61
 62class PasswordResetView(AuthView, FormView[PasswordSetForm]):
 63    form_class = PasswordSetForm
 64    reset_token_max_age = 60 * 60  # 1 hour
 65    _reset_token_session_key = "_password_reset_token"
 66
 67    def check_password_reset_token(self, token: str) -> User | None:
 68        max_age = self.reset_token_max_age
 69
 70        try:
 71            data = TimestampSigner(salt="password-reset").unsign_object(
 72                token, max_age=max_age
 73            )
 74        except SignatureExpired:
 75            return None
 76        except BadSignature:
 77            return None
 78
 79        try:
 80            user = User.query.get(id=data["id"])
 81        except (TypeError, ValueError, OverflowError, User.DoesNotExist):
 82            return None
 83
 84        # If the password has changed since the token was generated, the token is invalid.
 85        # (These are the hashed passwords, not the raw passwords.)
 86        if not hmac.compare_digest(
 87            force_bytes(user.password),
 88            force_bytes(data["password"]),
 89        ):
 90            return None
 91
 92        # If the email has changed since the token was generated, the token is invalid.
 93        if not hmac.compare_digest(force_bytes(user.email), force_bytes(data["email"])):
 94            return None
 95
 96        return user
 97
 98    def get(self) -> Response:
 99        if self.user:
100            # Redirect if the user is already logged in
101            return RedirectResponse(str(self.success_url) if self.success_url else "/")
102
103        # Tokens are initially passed as GET parameters and we
104        # immediately store them in the session and remove it from the URL.
105        if token := self.request.query_params.get("token", ""):
106            # Store the token in the session and redirect to the
107            # password reset form at a URL without the token. That
108            # avoids the possibility of leaking the token in the
109            # HTTP Referer header.
110            self.session[self._reset_token_session_key] = token
111            # Redirect to the path itself, without the GET parameters
112            response = RedirectResponse(self.request.path)
113            add_never_cache_headers(response)
114            return response
115
116        return super().get()
117
118    def get_user(self) -> User:
119        session_token = self.session.get(self._reset_token_session_key, "")
120        if not session_token:
121            # No token in the session, so we can't check the password reset token.
122            raise BadRequestError400("No password reset token found.")
123
124        user = self.check_password_reset_token(session_token)
125        if not user:
126            # Remove it from the session if it is invalid.
127            del self.session[self._reset_token_session_key]
128            raise BadRequestError400("Password reset token is no longer valid.")
129
130        return user
131
132    def get_form_kwargs(self) -> dict:
133        kwargs = super().get_form_kwargs()
134        kwargs["user"] = self.get_user()
135        return kwargs
136
137    def form_valid(self, form: PasswordSetForm) -> Response:
138        form.save()
139        del self.session[self._reset_token_session_key]
140        # If you wanted, you could log in the user here so they don't have to
141        # go through the log in form again.
142        return super().form_valid(form)
143
144
145class PasswordChangeView(AuthView, FormView[PasswordChangeForm]):
146    # Change to PasswordSetForm if you want to set new passwords
147    # without confirming the old one.
148    form_class = PasswordChangeForm
149
150    def get_form_kwargs(self) -> dict:
151        kwargs = super().get_form_kwargs()
152        kwargs["user"] = self.user
153        return kwargs
154
155    def form_valid(self, form: PasswordChangeForm) -> Response:
156        form.save()
157        # Updating the password logs out all other sessions for the user
158        # except the current one.
159        update_session_auth_hash(self.request, form.user)
160        return super().form_valid(form)
161
162
163class PasswordLoginView(AuthView, FormView[PasswordLoginForm]):
164    form_class = PasswordLoginForm
165    success_url = "/"
166
167    def get(self) -> Response:
168        # Redirect if the user is already logged in
169        if self.user:
170            return RedirectResponse(self.success_url)
171
172        return super().get()
173
174    def form_valid(self, form: PasswordLoginForm) -> Response:
175        # Log the user in and redirect
176        auth_login(self.request, form.get_user())
177
178        return super().form_valid(form)
179
180
181class PasswordSignupView(CreateView):
182    form_class = PasswordSignupForm
183    success_url = "/"
184
185    def form_valid(self, form: BaseForm) -> Response:
186        # # Log the user in and redirect
187        # auth_login(self.request, form.save())
188
189        return super().form_valid(form)