1"""
2This module contains helper functions for controlling caching. It does so by
3managing the "Vary" header of responses. It includes functions to patch the
4header of response objects directly and decorators that change functions to do
5that header-patching themselves.
6
7For information on the Vary header, see RFC 9110 Section 12.5.5.
8
9Essentially, the "Vary" HTTP header defines which headers a cache should take
10into account when building its cache key. Requests with the same path but
11different header content for headers named in "Vary" need to get different
12cache keys to prevent delivery of wrong content.
13
14An example: i18n middleware would need to distinguish caches by the
15"Accept-language" header.
16"""
17
18from __future__ import annotations
19
20import time
21from collections import defaultdict
22from typing import TYPE_CHECKING, Any
23
24from .http import http_date
25from .regex_helper import _lazy_re_compile
26
27if TYPE_CHECKING:
28 from plain.http import Response
29
30_cc_delim_re = _lazy_re_compile(r"\s*,\s*")
31
32
33def patch_response_headers(response: Response, cache_timeout: int | float) -> None:
34 """
35 Add HTTP caching headers to the given HttpResponse: Expires and
36 Cache-Control.
37
38 Each header is only added if it isn't already set.
39 """
40 if cache_timeout < 0:
41 cache_timeout = 0 # Can't have max-age negative
42 if "Expires" not in response.headers:
43 response.headers["Expires"] = http_date(time.time() + cache_timeout)
44 patch_cache_control(response, max_age=cache_timeout)
45
46
47def add_never_cache_headers(response: Response) -> None:
48 """
49 Add headers to a response to indicate that a page should never be cached.
50 """
51 patch_response_headers(response, cache_timeout=-1)
52 patch_cache_control(
53 response, no_cache=True, no_store=True, must_revalidate=True, private=True
54 )
55
56
57def patch_cache_control(response: Response, **kwargs: Any) -> None:
58 """
59 Patch the Cache-Control header by adding all keyword arguments to it.
60 The transformation is as follows:
61
62 * All keyword parameter names are turned to lowercase, and underscores
63 are converted to hyphens.
64 * If the value of a parameter is True (exactly True, not just a
65 true value), only the parameter name is added to the header.
66 * All other parameters are added with their value, after applying
67 str() to it.
68 """
69
70 def dictitem(s: str) -> tuple[str, str | bool]:
71 t = s.split("=", 1)
72 if len(t) > 1:
73 return (t[0].lower(), t[1])
74 else:
75 return (t[0].lower(), True)
76
77 def dictvalue(*t: str | bool) -> str:
78 if t[1] is True:
79 return str(t[0])
80 else:
81 return f"{t[0]}={t[1]}"
82
83 cc = defaultdict(set)
84 if response.headers.get("Cache-Control"):
85 for field in _cc_delim_re.split(response.headers["Cache-Control"]):
86 directive, value = dictitem(field)
87 if directive == "no-cache":
88 # no-cache supports multiple field names.
89 cc[directive].add(value)
90 else:
91 cc[directive] = value
92
93 # If there's already a max-age header but we're being asked to set a new
94 # max-age, use the minimum of the two ages. In practice this happens when
95 # a decorator and a piece of middleware both operate on a given view.
96 if "max-age" in cc and "max_age" in kwargs:
97 kwargs["max_age"] = min(int(cc["max-age"]), kwargs["max_age"])
98
99 # Allow overriding private caching and vice versa
100 if "private" in cc and "public" in kwargs:
101 del cc["private"]
102 elif "public" in cc and "private" in kwargs:
103 del cc["public"]
104
105 for k, v in kwargs.items():
106 directive = k.replace("_", "-")
107 if directive == "no-cache":
108 # no-cache supports multiple field names.
109 cc[directive].add(v)
110 else:
111 cc[directive] = v
112
113 directives = []
114 for directive, values in cc.items():
115 if isinstance(values, set):
116 if True in values:
117 # True takes precedence.
118 values = {True}
119 directives.extend([dictvalue(directive, value) for value in values])
120 else:
121 directives.append(dictvalue(directive, values))
122 cc = ", ".join(directives)
123 response.headers["Cache-Control"] = cc
124
125
126def patch_vary_headers(response: Response, newheaders: list[str]) -> None:
127 """
128 Add (or update) the "Vary" header in the given Response object.
129 newheaders is a list of header names that should be in "Vary". If headers
130 contains an asterisk, then "Vary" header will consist of a single asterisk
131 '*'. Otherwise, existing headers in "Vary" aren't removed.
132 """
133 # Note that we need to keep the original order intact, because cache
134 # implementations may rely on the order of the Vary contents in, say,
135 # computing an MD5 hash.
136 if "Vary" in response.headers:
137 vary_headers = _cc_delim_re.split(response.headers["Vary"])
138 else:
139 vary_headers = []
140 # Use .lower() here so we treat headers as case-insensitive.
141 existing_headers = {header.lower() for header in vary_headers}
142 additional_headers = [
143 newheader
144 for newheader in newheaders
145 if newheader.lower() not in existing_headers
146 ]
147 vary_headers += additional_headers
148 if "*" in vary_headers:
149 response.headers["Vary"] = "*"
150 else:
151 response.headers["Vary"] = ", ".join(vary_headers)
152
153
154def _to_tuple(s: str) -> tuple[str, str | bool]:
155 t = s.split("=", 1)
156 if len(t) == 2:
157 return t[0].lower(), t[1]
158 return t[0].lower(), True