1from __future__ import annotations
2
3from functools import cached_property
4from typing import TYPE_CHECKING, Any
5from urllib.parse import urlparse, urlunparse
6
7from plain.http import (
8 ForbiddenError403,
9 HTTPException,
10 NotFoundError404,
11 QueryDict,
12 RedirectResponse,
13 Response,
14)
15from plain.runtime import settings
16from plain.sessions.views import SessionView
17from plain.urls import reverse
18from plain.utils.cache import patch_cache_control
19from plain.views import View
20
21from .sessions import logout
22from .utils import resolve_url
23
24if TYPE_CHECKING:
25 from app.users.models import User
26
27try:
28 from plain.admin.impersonate import get_request_impersonator
29except ImportError:
30 get_request_impersonator: Any = None
31
32__all__ = [
33 "AuthView",
34 "LoginRequired",
35 "LogoutView",
36 "redirect_to_login",
37]
38
39
40class LoginRequired(HTTPException):
41 """Raised by `check_auth` when a view requires a logged-in user.
42
43 Subclasses an HTTPException so generic handlers (logging, APIs, MCP)
44 treat it as a 401 by default. HTML views rely on `AuthView.handle_exception`
45 to render a redirect to the configured login page instead.
46 """
47
48 status_code = 401
49
50 def __init__(self, login_url: str | None, redirect_field_name: str = "next"):
51 # Caller is responsible for resolving `login_url` — pass `None`
52 # to signal "no login page configured, render as 403 instead".
53 self.login_url = login_url
54 self.redirect_field_name = redirect_field_name
55
56
57class AuthView(SessionView):
58 login_required = False
59 admin_required = False # Implies login_required
60 login_url = settings.AUTH_LOGIN_URL
61
62 @cached_property
63 def user(self) -> User | None:
64 """Get the authenticated user for this request."""
65 from .requests import get_request_user
66
67 return get_request_user(self.request)
68
69 def get_template_context(self) -> dict:
70 """Add user and impersonator to template context."""
71 context = super().get_template_context()
72 context["user"] = self.user
73 return context
74
75 def check_auth(self) -> None:
76 """Raise LoginRequired, ForbiddenError403, or NotFoundError404 when access is denied."""
77 if not self.login_required and not self.admin_required:
78 return None
79
80 if not self.user:
81 raise LoginRequired(login_url=self.login_url)
82
83 if self.admin_required:
84 # At this point, we know user is authenticated (from check above)
85 # Check if impersonation is active
86 if get_request_impersonator:
87 if impersonator := get_request_impersonator(self.request):
88 # Impersonators should be able to view admin pages while impersonating.
89 # There's probably never a case where an impersonator isn't admin, but it can be configured.
90 if not impersonator.is_admin:
91 raise ForbiddenError403(
92 "You do not have permission to access this page."
93 )
94 return
95
96 if not self.user.is_admin:
97 # Show a 404 so we don't expose admin urls to non-admin users
98 raise NotFoundError404()
99
100 def before_request(self) -> None:
101 self.check_auth()
102
103 def handle_exception(self, exc: Exception) -> Response:
104 if isinstance(exc, LoginRequired):
105 if not exc.login_url:
106 # No configured login page — treat as a plain 403 and let
107 # the surrounding view's `handle_exception` chain render it.
108 return super().handle_exception(ForbiddenError403("Login required"))
109
110 path = self.request.build_absolute_uri()
111 resolved_login_url = reverse(exc.login_url)
112 # If the login url is the same scheme and net location then
113 # use the path as the "next" url.
114 login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
115 current_scheme, current_netloc = urlparse(path)[:2]
116 if (not login_scheme or login_scheme == current_scheme) and (
117 not login_netloc or login_netloc == current_netloc
118 ):
119 path = self.request.get_full_path()
120 return redirect_to_login(
121 path,
122 resolved_login_url,
123 exc.redirect_field_name,
124 )
125 return super().handle_exception(exc)
126
127 def after_response(self, response: Response) -> Response:
128 response = super().after_response(response)
129 if self.user:
130 # Make sure it at least has private as a default
131 patch_cache_control(response, private=True)
132 return response
133
134
135class LogoutView(View):
136 def post(self) -> RedirectResponse:
137 logout(self.request)
138 return RedirectResponse("/")
139
140
141def redirect_to_login(
142 next: str, login_url: str | None = None, redirect_field_name: str = "next"
143) -> RedirectResponse:
144 """
145 Redirect the user to the login page, passing the given 'next' page.
146 """
147 resolved_url = resolve_url(login_url or settings.AUTH_LOGIN_URL)
148
149 login_url_parts = list(urlparse(resolved_url))
150 if redirect_field_name:
151 querystring = QueryDict(login_url_parts[4], mutable=True)
152 querystring[redirect_field_name] = next
153 login_url_parts[4] = querystring.urlencode(safe="/")
154
155 return RedirectResponse(str(urlunparse(login_url_parts)), allow_external=True)