v0.150.0
  1"""
  2This module contains helper functions for controlling caching. It does so by
  3managing the "Vary" header of responses. It includes functions to patch the
  4header of response objects directly and decorators that change functions to do
  5that header-patching themselves.
  6
  7For information on the Vary header, see RFC 9110 Section 12.5.5.
  8
  9Essentially, the "Vary" HTTP header defines which headers a cache should take
 10into account when building its cache key. Requests with the same path but
 11different header content for headers named in "Vary" need to get different
 12cache keys to prevent delivery of wrong content.
 13
 14An example: i18n middleware would need to distinguish caches by the
 15"Accept-language" header.
 16"""
 17
 18from __future__ import annotations
 19
 20import time
 21from collections import defaultdict
 22from typing import TYPE_CHECKING, Any
 23
 24from .http import http_date
 25from .regex_helper import _lazy_re_compile
 26
 27if TYPE_CHECKING:
 28    from plain.http import Response
 29
 30_cc_delim_re = _lazy_re_compile(r"\s*,\s*")
 31
 32
 33def patch_response_headers(response: Response, cache_timeout: int | float) -> None:
 34    """
 35    Add HTTP caching headers to the given HttpResponse: Expires and
 36    Cache-Control.
 37
 38    Each header is only added if it isn't already set.
 39    """
 40    if cache_timeout < 0:
 41        cache_timeout = 0  # Can't have max-age negative
 42    if "Expires" not in response.headers:
 43        response.headers["Expires"] = http_date(time.time() + cache_timeout)
 44    patch_cache_control(response, max_age=cache_timeout)
 45
 46
 47def add_never_cache_headers(response: Response) -> None:
 48    """
 49    Add headers to a response to indicate that a page should never be cached.
 50    """
 51    patch_response_headers(response, cache_timeout=-1)
 52    patch_cache_control(
 53        response, no_cache=True, no_store=True, must_revalidate=True, private=True
 54    )
 55
 56
 57def patch_cache_control(response: Response, **kwargs: Any) -> None:
 58    """
 59    Patch the Cache-Control header by adding all keyword arguments to it.
 60    The transformation is as follows:
 61
 62    * All keyword parameter names are turned to lowercase, and underscores
 63      are converted to hyphens.
 64    * If the value of a parameter is True (exactly True, not just a
 65      true value), only the parameter name is added to the header.
 66    * All other parameters are added with their value, after applying
 67      str() to it.
 68    """
 69
 70    def dictitem(s: str) -> tuple[str, str | bool]:
 71        t = s.split("=", 1)
 72        if len(t) > 1:
 73            return (t[0].lower(), t[1])
 74        else:
 75            return (t[0].lower(), True)
 76
 77    def dictvalue(*t: str | bool) -> str:
 78        if t[1] is True:
 79            return str(t[0])
 80        else:
 81            return f"{t[0]}={t[1]}"
 82
 83    cc = defaultdict(set)
 84    if response.headers.get("Cache-Control"):
 85        for field in _cc_delim_re.split(response.headers["Cache-Control"]):
 86            directive, value = dictitem(field)
 87            if directive == "no-cache":
 88                # no-cache supports multiple field names.
 89                cc[directive].add(value)
 90            else:
 91                cc[directive] = value
 92
 93    # If there's already a max-age header but we're being asked to set a new
 94    # max-age, use the minimum of the two ages. In practice this happens when
 95    # a decorator and a piece of middleware both operate on a given view.
 96    if "max-age" in cc and "max_age" in kwargs:
 97        kwargs["max_age"] = min(int(cc["max-age"]), kwargs["max_age"])
 98
 99    # Allow overriding private caching and vice versa
100    if "private" in cc and "public" in kwargs:
101        del cc["private"]
102    elif "public" in cc and "private" in kwargs:
103        del cc["public"]
104
105    for k, v in kwargs.items():
106        directive = k.replace("_", "-")
107        if directive == "no-cache":
108            # no-cache supports multiple field names.
109            cc[directive].add(v)
110        else:
111            cc[directive] = v
112
113    directives = []
114    for directive, values in cc.items():
115        if isinstance(values, set):
116            if True in values:
117                # True takes precedence.
118                values = {True}
119            directives.extend([dictvalue(directive, value) for value in values])
120        else:
121            directives.append(dictvalue(directive, values))
122    cc = ", ".join(directives)
123    response.headers["Cache-Control"] = cc
124
125
126def patch_vary_headers(response: Response, newheaders: list[str]) -> None:
127    """
128    Add (or update) the "Vary" header in the given Response object.
129    newheaders is a list of header names that should be in "Vary". If headers
130    contains an asterisk, then "Vary" header will consist of a single asterisk
131    '*'. Otherwise, existing headers in "Vary" aren't removed.
132    """
133    # Note that we need to keep the original order intact, because cache
134    # implementations may rely on the order of the Vary contents in, say,
135    # computing an MD5 hash.
136    if "Vary" in response.headers:
137        vary_headers = _cc_delim_re.split(response.headers["Vary"])
138    else:
139        vary_headers = []
140    # Use .lower() here so we treat headers as case-insensitive.
141    existing_headers = {header.lower() for header in vary_headers}
142    additional_headers = [
143        newheader
144        for newheader in newheaders
145        if newheader.lower() not in existing_headers
146    ]
147    vary_headers += additional_headers
148    if "*" in vary_headers:
149        response.headers["Vary"] = "*"
150    else:
151        response.headers["Vary"] = ", ".join(vary_headers)
152
153
154def _to_tuple(s: str) -> tuple[str, str | bool]:
155    t = s.split("=", 1)
156    if len(t) == 2:
157        return t[0].lower(), t[1]
158    return t[0].lower(), True