1from __future__ import annotations
2
3from typing import TYPE_CHECKING, Any
4from urllib.parse import quote
5
6from jinja2.runtime import Context
7
8from plain.runtime import settings
9from plain.templates import register_template_extension, register_template_global
10from plain.templates.jinja.extensions import InclusionTagExtension
11
12from .identity import encrypt_identity, sign_render_token
13from .tracing import current_trace
14
15if TYPE_CHECKING:
16 from plain.http import Request
17
18# plain.connect doesn't depend on plain.auth — an app without auth installed
19# falls back to a no-op so callers don't need to special-case the absence.
20try:
21 from plain.auth import get_request_user
22except ImportError:
23
24 def get_request_user(request: Request) -> Any:
25 return None
26
27
28@register_template_extension
29class ConnectPageviewsExtension(InclusionTagExtension):
30 tags = {"connect_pageviews"}
31 template_name = "connect/pageviews.html"
32
33 def get_context(
34 self, context: Context, *args: Any, **kwargs: Any
35 ) -> dict[str, Any]:
36 request = context.get("request")
37 token = settings.CONNECT_PAGEVIEWS_TOKEN
38 secret = str(settings.CONNECT_SECRET_KEY) if token else ""
39 return {
40 "request": request,
41 "connect_pageviews_token": token,
42 "connect_pageviews_url": settings.CONNECT_PAGEVIEWS_URL,
43 "connect_pageviews_identity": _identity_token(request, secret)
44 if token
45 else "",
46 "connect_pageviews_trace_id": current_trace().trace_id if token else "",
47 "connect_pageviews_route": _current_route(request) if token else "",
48 }
49
50
51@register_template_extension
52class ConnectSupportFieldsExtension(InclusionTagExtension):
53 tags = {"connect_support_fields"}
54 template_name = "connect/support_fields.html"
55
56 def get_context(
57 self, context: Context, *args: Any, **kwargs: Any
58 ) -> dict[str, Any]:
59 secret = str(settings.CONNECT_SECRET_KEY)
60 return {
61 "connect_support_identity": _identity_token(context.get("request"), secret),
62 "connect_support_render_token": sign_render_token(secret),
63 }
64
65
66@register_template_global
67def connect_support_url(endpoint_id: str) -> str:
68 """Build the form-action URL for a support endpoint."""
69 base = str(settings.CONNECT_CLOUD_URL).rstrip("/")
70 return f"{base}/forms/{quote(endpoint_id, safe='')}"
71
72
73def _current_route(request: Request | None) -> str:
74 # The matched URL route pattern (e.g. "/blog/<slug>/"), resolved before the
75 # template renders. Lets pageviews aggregate by view instead of by raw URL.
76 # Mirrors the http.route span attribute, including the leading slash.
77 if request is None:
78 return ""
79 resolver_match = request.resolver_match
80 if resolver_match is None or resolver_match.route is None:
81 return ""
82 return f"/{resolver_match.route}"
83
84
85def _identity_token(request: Request | None, secret: str) -> str:
86 if not secret or request is None:
87 return ""
88 user = get_request_user(request)
89 if user is None:
90 return ""
91 return encrypt_identity(user.id, secret)