v0.139.0
  1from __future__ import annotations
  2
  3from functools import cached_property
  4from typing import TYPE_CHECKING, Any
  5from urllib.parse import urlparse, urlunparse
  6
  7from plain.http import (
  8    ForbiddenError403,
  9    HTTPException,
 10    NotFoundError404,
 11    QueryDict,
 12    RedirectResponse,
 13    Response,
 14)
 15from plain.runtime import settings
 16from plain.sessions.views import SessionView
 17from plain.urls import reverse
 18from plain.utils.cache import patch_cache_control
 19from plain.views import View
 20
 21from .sessions import logout
 22from .utils import resolve_url
 23
 24if TYPE_CHECKING:
 25    from app.users.models import User
 26
 27try:
 28    from plain.admin.impersonate import get_request_impersonator
 29except ImportError:
 30    get_request_impersonator: Any = None
 31
 32__all__ = [
 33    "AuthView",
 34    "LoginRequired",
 35    "LogoutView",
 36    "redirect_to_login",
 37]
 38
 39
 40class LoginRequired(HTTPException):
 41    """Raised by `check_auth` when a view requires a logged-in user.
 42
 43    Subclasses an HTTPException so generic handlers (logging, APIs, MCP)
 44    treat it as a 401 by default. HTML views rely on `AuthView.handle_exception`
 45    to render a redirect to the configured login page instead.
 46    """
 47
 48    status_code = 401
 49
 50    def __init__(self, login_url: str | None, redirect_field_name: str = "next"):
 51        # Caller is responsible for resolving `login_url` — pass `None`
 52        # to signal "no login page configured, render as 403 instead".
 53        self.login_url = login_url
 54        self.redirect_field_name = redirect_field_name
 55
 56
 57class AuthView(SessionView):
 58    login_required = False
 59    admin_required = False  # Implies login_required
 60    login_url = settings.AUTH_LOGIN_URL
 61
 62    @cached_property
 63    def user(self) -> User | None:
 64        """Get the authenticated user for this request."""
 65        from .requests import get_request_user
 66
 67        return get_request_user(self.request)
 68
 69    def get_template_context(self) -> dict:
 70        """Add user and impersonator to template context."""
 71        context = super().get_template_context()
 72        context["user"] = self.user
 73        return context
 74
 75    def check_auth(self) -> None:
 76        """Raise LoginRequired, ForbiddenError403, or NotFoundError404 when access is denied."""
 77        if not self.login_required and not self.admin_required:
 78            return None
 79
 80        if not self.user:
 81            raise LoginRequired(login_url=self.login_url)
 82
 83        if self.admin_required:
 84            # At this point, we know user is authenticated (from check above)
 85            # Check if impersonation is active
 86            if get_request_impersonator:
 87                if impersonator := get_request_impersonator(self.request):
 88                    # Impersonators should be able to view admin pages while impersonating.
 89                    # There's probably never a case where an impersonator isn't admin, but it can be configured.
 90                    if not impersonator.is_admin:
 91                        raise ForbiddenError403(
 92                            "You do not have permission to access this page."
 93                        )
 94                    return
 95
 96            if not self.user.is_admin:
 97                # Show a 404 so we don't expose admin urls to non-admin users
 98                raise NotFoundError404()
 99
100    def before_request(self) -> None:
101        self.check_auth()
102
103    def handle_exception(self, exc: Exception) -> Response:
104        if isinstance(exc, LoginRequired):
105            if not exc.login_url:
106                # No configured login page — treat as a plain 403 and let
107                # the surrounding view's `handle_exception` chain render it.
108                return super().handle_exception(ForbiddenError403("Login required"))
109
110            path = self.request.build_absolute_uri()
111            resolved_login_url = reverse(exc.login_url)
112            # If the login url is the same scheme and net location then
113            # use the path as the "next" url.
114            login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
115            current_scheme, current_netloc = urlparse(path)[:2]
116            if (not login_scheme or login_scheme == current_scheme) and (
117                not login_netloc or login_netloc == current_netloc
118            ):
119                path = self.request.get_full_path()
120            return redirect_to_login(
121                path,
122                resolved_login_url,
123                exc.redirect_field_name,
124            )
125        return super().handle_exception(exc)
126
127    def after_response(self, response: Response) -> Response:
128        response = super().after_response(response)
129        if self.user:
130            # Make sure it at least has private as a default
131            patch_cache_control(response, private=True)
132        return response
133
134
135class LogoutView(View):
136    def post(self) -> RedirectResponse:
137        logout(self.request)
138        return RedirectResponse("/")
139
140
141def redirect_to_login(
142    next: str, login_url: str | None = None, redirect_field_name: str = "next"
143) -> RedirectResponse:
144    """
145    Redirect the user to the login page, passing the given 'next' page.
146    """
147    resolved_url = resolve_url(login_url or settings.AUTH_LOGIN_URL)
148
149    login_url_parts = list(urlparse(resolved_url))
150    if redirect_field_name:
151        querystring = QueryDict(login_url_parts[4], mutable=True)
152        querystring[redirect_field_name] = next
153        login_url_parts[4] = querystring.urlencode(safe="/")
154
155    return RedirectResponse(str(urlunparse(login_url_parts)), allow_external=True)