Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import hmac
  4from datetime import datetime
  5from typing import TYPE_CHECKING, Any
  6
  7from plain import signing
  8from plain.auth import get_user_model
  9from plain.auth.sessions import login as auth_login
 10from plain.auth.sessions import update_session_auth_hash
 11from plain.auth.views import AuthView
 12from plain.exceptions import BadRequest
 13from plain.forms import BaseForm
 14from plain.http import (
 15    ResponseRedirect,
 16)
 17from plain.urls import reverse
 18from plain.utils.cache import add_never_cache_headers
 19from plain.utils.encoding import force_bytes
 20from plain.views import CreateView, FormView
 21
 22from .forms import (
 23    PasswordChangeForm,
 24    PasswordLoginForm,
 25    PasswordResetForm,
 26    PasswordSetForm,
 27    PasswordSignupForm,
 28)
 29
 30if TYPE_CHECKING:
 31    from plain.http import Response
 32    from plain.models import Model
 33
 34
 35class PasswordForgotView(FormView):
 36    form_class = PasswordResetForm
 37    reset_confirm_url_name: str
 38
 39    def generate_password_reset_token(self, user: Any) -> str:
 40        return signing.dumps(
 41            {
 42                "id": user.id,
 43                "email": user.email,
 44                "password": user.password,  # Hashed password
 45                "timestamp": datetime.now().timestamp(),  # Makes each token unique
 46            },
 47            salt="password-reset",
 48            compress=True,
 49        )
 50
 51    def generate_password_reset_url(self, user: Any) -> str:
 52        token = self.generate_password_reset_token(user)
 53        url = reverse(self.reset_confirm_url_name) + f"?token={token}"
 54        return self.request.build_absolute_uri(url)
 55
 56    def form_valid(self, form: PasswordResetForm) -> Response:  # type: ignore[override]
 57        form.save(
 58            generate_reset_url=self.generate_password_reset_url,
 59        )
 60        return super().form_valid(form)
 61
 62
 63class PasswordResetView(AuthView, FormView):
 64    form_class = PasswordSetForm
 65    reset_token_max_age = 60 * 60  # 1 hour
 66    _reset_token_session_key = "_password_reset_token"
 67
 68    def check_password_reset_token(self, token: str) -> Model | None:
 69        max_age = self.reset_token_max_age
 70
 71        try:
 72            data = signing.loads(token, salt="password-reset", max_age=max_age)
 73        except signing.SignatureExpired:
 74            return None
 75        except signing.BadSignature:
 76            return None
 77
 78        UserModel = get_user_model()
 79        try:
 80            user = UserModel.query.get(id=data["id"])
 81        except (TypeError, ValueError, OverflowError, UserModel.DoesNotExist):
 82            return None
 83
 84        # If the password has changed since the token was generated, the token is invalid.
 85        # (These are the hashed passwords, not the raw passwords.)
 86        if not hmac.compare_digest(
 87            force_bytes(user.password), force_bytes(data["password"])
 88        ):
 89            return None
 90
 91        # If the email has changed since the token was generated, the token is invalid.
 92        if not hmac.compare_digest(force_bytes(user.email), force_bytes(data["email"])):
 93            return None
 94
 95        return user
 96
 97    def get(self) -> Response:
 98        if self.user:
 99            # Redirect if the user is already logged in
100            return ResponseRedirect(str(self.success_url) if self.success_url else "/")
101
102        # Tokens are initially passed as GET parameters and we
103        # immediately store them in the session and remove it from the URL.
104        if token := self.request.query_params.get("token", ""):
105            # Store the token in the session and redirect to the
106            # password reset form at a URL without the token. That
107            # avoids the possibility of leaking the token in the
108            # HTTP Referer header.
109            self.session[self._reset_token_session_key] = token
110            # Redirect to the path itself, without the GET parameters
111            response = ResponseRedirect(self.request.path)
112            add_never_cache_headers(response)
113            return response
114
115        return super().get()
116
117    def get_user(self) -> Model:
118        session_token = self.session.get(self._reset_token_session_key, "")
119        if not session_token:
120            # No token in the session, so we can't check the password reset token.
121            raise BadRequest("No password reset token found.")
122
123        user = self.check_password_reset_token(session_token)
124        if not user:
125            # Remove it from the session if it is invalid.
126            del self.session[self._reset_token_session_key]
127            raise BadRequest("Password reset token is no longer valid.")
128
129        return user
130
131    def get_form_kwargs(self) -> dict:
132        kwargs = super().get_form_kwargs()
133        kwargs["user"] = self.get_user()
134        return kwargs
135
136    def form_valid(self, form: PasswordSetForm) -> Response:  # type: ignore[override]
137        form.save()
138        del self.session[self._reset_token_session_key]
139        # If you wanted, you could log in the user here so they don't have to
140        # go through the log in form again.
141        return super().form_valid(form)
142
143
144class PasswordChangeView(AuthView, FormView):
145    # Change to PasswordSetForm if you want to set new passwords
146    # without confirming the old one.
147    form_class = PasswordChangeForm
148
149    def get_form_kwargs(self) -> dict:
150        kwargs = super().get_form_kwargs()
151        kwargs["user"] = self.user
152        return kwargs
153
154    def form_valid(self, form: PasswordChangeForm) -> Response:  # type: ignore[override]
155        form.save()
156        # Updating the password logs out all other sessions for the user
157        # except the current one.
158        update_session_auth_hash(self.request, form.user)
159        return super().form_valid(form)
160
161
162class PasswordLoginView(AuthView, FormView):
163    form_class = PasswordLoginForm
164    success_url = "/"
165
166    def get(self) -> Response:
167        # Redirect if the user is already logged in
168        if self.user:
169            return ResponseRedirect(self.success_url)
170
171        return super().get()
172
173    def form_valid(self, form: PasswordLoginForm) -> Response:  # type: ignore[override]
174        # Log the user in and redirect
175        auth_login(self.request, form.get_user())
176
177        return super().form_valid(form)
178
179
180class PasswordSignupView(CreateView):
181    form_class = PasswordSignupForm
182    success_url = "/"
183
184    def form_valid(self, form: BaseForm) -> Response:
185        # # Log the user in and redirect
186        # auth_login(self.request, form.save())
187
188        return super().form_valid(form)