1from __future__ import annotations
2
3from functools import cached_property
4from typing import Any
5from urllib.parse import urlparse, urlunparse
6
7from plain.exceptions import PermissionDenied
8from plain.http import (
9 Http404,
10 QueryDict,
11 ResponseBase,
12 ResponseRedirect,
13)
14from plain.runtime import settings
15from plain.sessions.views import SessionView
16from plain.urls import reverse
17from plain.utils.cache import patch_cache_control
18from plain.views import View
19
20from .sessions import logout
21from .utils import resolve_url
22
23try:
24 from plain.admin.impersonate import get_request_impersonator
25except ImportError:
26 get_request_impersonator: Any = None
27
28
29class LoginRequired(Exception):
30 def __init__(self, login_url: str | None = None, redirect_field_name: str = "next"):
31 self.login_url = login_url or settings.AUTH_LOGIN_URL
32 self.redirect_field_name = redirect_field_name
33
34
35class AuthView(SessionView):
36 login_required = False
37 admin_required = False # Implies login_required
38 login_url = settings.AUTH_LOGIN_URL
39
40 @cached_property
41 def user(self) -> Any | None:
42 """Get the authenticated user for this request."""
43 from .requests import get_request_user
44
45 return get_request_user(self.request)
46
47 def get_template_context(self) -> dict:
48 """Add user and impersonator to template context."""
49 context = super().get_template_context()
50 context["user"] = self.user
51 return context
52
53 def check_auth(self) -> None:
54 """
55 Raises either LoginRequired or PermissionDenied.
56 - LoginRequired can specify a login_url and redirect_field_name
57 - PermissionDenied can specify a message
58 """
59 if not self.login_required and not self.admin_required:
60 return None
61
62 if not self.user:
63 raise LoginRequired(login_url=self.login_url)
64
65 if self.admin_required:
66 # At this point, we know user is authenticated (from check above)
67 # Check if impersonation is active
68 if get_request_impersonator:
69 if impersonator := get_request_impersonator(self.request):
70 # Impersonators should be able to view admin pages while impersonating.
71 # There's probably never a case where an impersonator isn't admin, but it can be configured.
72 if not impersonator.is_admin:
73 raise PermissionDenied(
74 "You do not have permission to access this page."
75 )
76 return
77
78 if not self.user.is_admin:
79 # Show a 404 so we don't expose admin urls to non-admin users
80 raise Http404()
81
82 def get_response(self) -> ResponseBase:
83 try:
84 self.check_auth()
85 except LoginRequired as e:
86 if self.login_url:
87 # Ideally this could be handled elsewhere... like PermissionDenied
88 # also seems like this code is used multiple places anyway...
89 # could be easier to get redirect query param
90 path = self.request.build_absolute_uri()
91 resolved_login_url = reverse(e.login_url)
92 # If the login url is the same scheme and net location then use the
93 # path as the "next" url.
94 login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
95 current_scheme, current_netloc = urlparse(path)[:2]
96 if (not login_scheme or login_scheme == current_scheme) and (
97 not login_netloc or login_netloc == current_netloc
98 ):
99 path = self.request.get_full_path()
100 return redirect_to_login(
101 path,
102 resolved_login_url,
103 e.redirect_field_name,
104 )
105 else:
106 raise PermissionDenied("Login required")
107
108 response = super().get_response()
109
110 if self.user:
111 # Make sure it at least has private as a default
112 patch_cache_control(response, private=True)
113
114 return response
115
116
117class LogoutView(View):
118 def post(self) -> ResponseRedirect:
119 logout(self.request)
120 return ResponseRedirect("/")
121
122
123def redirect_to_login(
124 next: str, login_url: str | None = None, redirect_field_name: str = "next"
125) -> ResponseRedirect:
126 """
127 Redirect the user to the login page, passing the given 'next' page.
128 """
129 resolved_url = resolve_url(login_url or settings.AUTH_LOGIN_URL)
130
131 login_url_parts = list(urlparse(resolved_url))
132 if redirect_field_name:
133 querystring = QueryDict(login_url_parts[4], mutable=True)
134 querystring[redirect_field_name] = next
135 login_url_parts[4] = querystring.urlencode(safe="/")
136
137 return ResponseRedirect(str(urlunparse(login_url_parts)))