1from __future__ import annotations
  2
  3import functools
  4import mimetypes
  5import os
  6from email.utils import formatdate, parsedate
  7from io import BytesIO
  8
  9from plain.http import (
 10    FileResponse,
 11    NotFoundError404,
 12    NotModifiedResponse,
 13    RedirectResponse,
 14    Response,
 15    StreamingResponse,
 16)
 17from plain.http.response import ResponseHeaders
 18from plain.runtime import settings
 19from plain.urls import reverse
 20from plain.views import View
 21
 22from .compile import get_compiled_path
 23from .finders import _iter_assets
 24from .manifest import AssetsManifest, get_manifest
 25
 26
 27class AssetView(View):
 28    """
 29    Serve an asset file directly.
 30
 31    This class could be subclassed to further tweak the responses or behavior.
 32    """
 33
 34    def get_manifest(self) -> AssetsManifest:
 35        """Get the assets manifest. Override in tests to provide a custom manifest."""
 36        return get_manifest()
 37
 38    def get_url_path(self) -> str | None:
 39        return self.url_kwargs["path"]
 40
 41    def get(self) -> Response | FileResponse | StreamingResponse:
 42        url_path = self.get_url_path()
 43
 44        if not url_path:
 45            raise NotFoundError404("Asset path not found")
 46
 47        # Make a trailing slash work, but we don't expect it
 48        url_path = url_path.rstrip("/")
 49
 50        # If a CDN is configured, redirect compiled assets there
 51        if settings.ASSETS_CDN_URL:
 52            if cdn_response := self.get_cdn_redirect_response(url_path):
 53                return cdn_response
 54
 55        if settings.DEBUG:
 56            absolute_path = self.get_debug_asset_path(url_path)
 57        else:
 58            absolute_path = self.get_asset_path(url_path)
 59
 60            if settings.ASSETS_REDIRECT_ORIGINAL:
 61                if redirect_response := self.get_redirect_response(url_path):
 62                    return redirect_response
 63
 64        # check_asset_path validates and raises if path is invalid
 65        # After this point, absolute_path is guaranteed to be a valid str
 66        self.check_asset_path(absolute_path)
 67        # Type guard: absolute_path is now str (check_asset_path raises if None/invalid)
 68        assert absolute_path is not None
 69
 70        if encoded_path := self.get_encoded_path(absolute_path):
 71            absolute_path = encoded_path
 72
 73        if range_response := self.get_range_response(absolute_path):
 74            return range_response
 75
 76        if not_modified_response := self.get_conditional_response(absolute_path):
 77            return not_modified_response
 78
 79        content_type, _ = mimetypes.guess_type(absolute_path)
 80
 81        response = FileResponse(
 82            open(absolute_path, "rb"),
 83            filename=os.path.basename(absolute_path),
 84            content_type=content_type,
 85        )
 86        response.headers = self.update_headers(response.headers, absolute_path)
 87        return response
 88
 89    def get_asset_path(self, path: str) -> str:
 90        """Get the path to the compiled asset"""
 91        compiled_path = os.path.abspath(get_compiled_path())
 92        asset_path = os.path.join(compiled_path, path)
 93
 94        # Make sure we don't try to escape the compiled assets path
 95        if not os.path.commonpath([compiled_path, asset_path]) == compiled_path:
 96            raise NotFoundError404("Asset not found")
 97
 98        return asset_path
 99
100    def get_debug_asset_path(self, path: str) -> str | None:
101        """Make a "live" check to find the uncompiled asset in the filesystem"""
102        for asset in _iter_assets():
103            if asset.url_path == path:
104                return asset.absolute_path
105        return None
106
107    def check_asset_path(self, path: str | None) -> None:
108        if not path:
109            raise NotFoundError404("Asset not found")
110
111        if not os.path.exists(path):
112            raise NotFoundError404("Asset not found")
113
114        if os.path.isdir(path):
115            raise NotFoundError404("Asset is a directory")
116
117    @functools.cache
118    def get_last_modified(self, path: str) -> str | None:
119        try:
120            mtime = os.path.getmtime(path)
121        except OSError:
122            mtime = None
123
124        if mtime:
125            return formatdate(mtime, usegmt=True)
126        return None
127
128    @functools.cache
129    def get_etag(self, path: str) -> str:
130        try:
131            mtime = os.path.getmtime(path)
132        except OSError:
133            mtime = 0.0
134
135        timestamp = int(mtime)
136        size = self.get_size(path)
137        return f'"{timestamp:x}-{size:x}"'
138
139    @functools.cache
140    def get_size(self, path: str) -> int:
141        return os.path.getsize(path)
142
143    def update_headers(self, headers: ResponseHeaders, path: str) -> ResponseHeaders:
144        headers.setdefault("Access-Control-Allow-Origin", "*")
145
146        # Always vary on Accept-Encoding
147        vary = headers.get("Vary")
148        if not vary:
149            headers["Vary"] = "Accept-Encoding"
150        elif vary != "*" and "Accept-Encoding" not in vary:
151            headers["Vary"] = vary + ", Accept-Encoding"
152
153        # If the file is compressed, tell the browser
154        if encoding := mimetypes.guess_type(path)[1]:
155            headers.setdefault("Content-Encoding", encoding)
156
157        is_immutable = self.is_immutable(path)
158
159        if is_immutable:
160            max_age = 10 * 365 * 24 * 60 * 60  # 10 years
161            headers.setdefault("Cache-Control", f"max-age={max_age}, immutable")
162        elif settings.DEBUG:
163            # In development, cache for 1 second to avoid re-fetching the same file
164            headers.setdefault("Cache-Control", "max-age=0")
165        else:
166            # Tell the browser to cache the file for 60 seconds if nothing else
167            headers.setdefault("Cache-Control", "max-age=60")
168
169        if not is_immutable:
170            if last_modified := self.get_last_modified(path):
171                headers.setdefault("Last-Modified", last_modified)
172            if etag := self.get_etag(path):
173                headers.setdefault("ETag", etag)
174
175        if "Content-Disposition" in headers:
176            # This header messes up Safari...
177            # https://github.com/evansd/whitenoise/commit/93657cf88e14b919cb726864814617a6a639e507
178            # At some point, should probably look at not using FileResponse at all?
179            del headers["Content-Disposition"]
180
181        return headers
182
183    def is_immutable(self, path: str) -> bool:
184        """
185        Determine whether an asset is immutable (fingerprinted).
186
187        Checks if the path is a fingerprinted path in the manifest.
188        Also handles compressed variants (e.g., main.abc1234.css.gz).
189        """
190        # Convert absolute path to URL-relative path for manifest lookup
191        compiled_path = os.path.abspath(get_compiled_path())
192        if path.startswith(compiled_path):
193            url_path = os.path.relpath(path, compiled_path)
194        else:
195            url_path = path
196
197        manifest = self.get_manifest()
198
199        # Check the path directly, then without compression extension
200        if manifest.is_fingerprinted(url_path):
201            return True
202        if url_path.endswith((".gz", ".br")):
203            return manifest.is_fingerprinted(url_path[:-3])
204        return False
205
206    def get_encoded_path(self, path: str) -> str | None:
207        """
208        If the client supports compression, return the path to the compressed file.
209        Otherwise, return the original path.
210        """
211        accept_encoding = self.request.headers.get("Accept-Encoding")
212        if not accept_encoding:
213            return None
214
215        if "br" in accept_encoding:
216            br_path = path + ".br"
217            if os.path.exists(br_path):
218                return br_path
219
220        if "gzip" in accept_encoding:
221            gzip_path = path + ".gz"
222            if os.path.exists(gzip_path):
223                return gzip_path
224        return None
225
226    def get_redirect_response(self, path: str) -> RedirectResponse | None:
227        """If the asset is not found, try to redirect to the fingerprinted path"""
228        compiled_url_path = self.get_manifest().resolve(path)
229
230        if not compiled_url_path or compiled_url_path == path:
231            # Don't need to redirect if there is no compiled path,
232            # or we're already looking at it (not fingerprinted).
233            return None
234
235        # Import here to avoid circular import (urls.py imports AssetView)
236        from .urls import AssetsRouter
237
238        return RedirectResponse(
239            redirect_to=reverse(
240                f"{AssetsRouter.namespace}:asset", path=compiled_url_path
241            ),
242            headers={
243                "Cache-Control": "max-age=60",  # Can cache this for a short time, but the fingerprinted path can change
244            },
245        )
246
247    def get_cdn_redirect_response(self, path: str) -> RedirectResponse | None:
248        """Redirect to CDN URL for compiled assets.
249
250        Assets not in the manifest (e.g., page assets) are not redirected
251        and will be served locally.
252        """
253        manifest = self.get_manifest()
254
255        if path not in manifest:
256            return None
257
258        redirect_target = manifest[path]
259        final_path = redirect_target or path
260        is_immutable = redirect_target is None and manifest.is_fingerprinted(path)
261
262        if is_immutable:
263            status_code = 301
264            cache_control = "max-age=31536000, immutable"
265        else:
266            status_code = 302
267            cache_control = "max-age=60"
268
269        cdn_url = f"{settings.ASSETS_CDN_URL.rstrip('/')}/{final_path.lstrip('/')}"
270        return RedirectResponse(
271            redirect_to=cdn_url,
272            status_code=status_code,
273            allow_external=True,
274            headers={"Cache-Control": cache_control},
275        )
276
277    def get_conditional_response(self, path: str) -> NotModifiedResponse | None:
278        """
279        Support conditional requests (HTTP 304 response) based on ETag and Last-Modified headers.
280        """
281        if self.request.headers.get("If-None-Match") == self.get_etag(path):
282            return self._not_modified_response(path)
283
284        if "If-Modified-Since" in self.request.headers:
285            if_modified_since = parsedate(self.request.headers["If-Modified-Since"])
286            last_modified = parsedate(self.get_last_modified(path))
287            if (
288                if_modified_since
289                and last_modified
290                and if_modified_since >= last_modified
291            ):
292                return self._not_modified_response(path)
293        return None
294
295    def _not_modified_response(self, path: str) -> NotModifiedResponse:
296        response = NotModifiedResponse()
297        response.headers = self.update_headers(response.headers, path)
298        if not settings.ASSETS_LOG_304:
299            response.log_access = False
300        return response
301
302    def get_range_response(self, path: str) -> Response | StreamingResponse | None:
303        """
304        Support range requests (HTTP 206 response).
305        """
306        range_header = self.request.headers.get("HTTP_RANGE")
307        if not range_header:
308            return None
309
310        file_size = self.get_size(path)
311
312        if not range_header.startswith("bytes="):
313            return Response(
314                status_code=416, headers={"Content-Range": f"bytes */{file_size}"}
315            )
316
317        range_values = range_header.split("=")[1].split("-")
318        start = int(range_values[0]) if range_values[0] else 0
319        end = int(range_values[1]) if range_values[1] else float("inf")
320
321        if start >= file_size:
322            return Response(
323                status_code=416, headers={"Content-Range": f"bytes */{file_size}"}
324            )
325
326        end = int(min(end, file_size - 1))
327
328        with open(path, "rb") as f:
329            f.seek(start)
330            content = f.read(end - start + 1)
331
332        response = StreamingResponse(BytesIO(content), status_code=206)
333        response.headers = self.update_headers(response.headers, path)
334        response.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
335        response.headers["Content-Length"] = str(end - start + 1)
336        return response