v0.146.0
  1from __future__ import annotations
  2
  3from functools import cached_property
  4from typing import Any
  5
  6from plain.toolbar import ToolbarItem, register_toolbar_item
  7from plain.utils.os import get_memory_usage, get_process_cpu_percent
  8
  9from .core import Observer
 10from .formatting import format_bytes
 11
 12
 13def _level(value: int | float, warn: int, danger: int) -> str:
 14    if value >= danger:
 15        return "danger"
 16    if value >= warn:
 17        return "warn"
 18    return "ok"
 19
 20
 21def _get_system_stats() -> dict[str, Any]:
 22    """Get system-level CPU and memory stats."""
 23    cpu_percent = get_process_cpu_percent()
 24    usage_bytes, limit_bytes = get_memory_usage()
 25
 26    stats: dict[str, Any] = {}
 27
 28    if cpu_percent is not None:
 29        stats["cpu_percent"] = cpu_percent
 30        stats["cpu_level"] = _level(cpu_percent, warn=50, danger=80)
 31
 32    if limit_bytes is not None:
 33        mem_percent = round(usage_bytes / limit_bytes * 100)
 34        stats["mem_display"] = f"{mem_percent}%"
 35        stats["mem_title"] = (
 36            f"Container memory: {format_bytes(usage_bytes)} / {format_bytes(limit_bytes)}"
 37        )
 38        stats["mem_level"] = _level(mem_percent, warn=70, danger=90)
 39    else:
 40        stats["mem_display"] = format_bytes(usage_bytes, precision=0)
 41        stats["mem_title"] = "Server process RSS"
 42        stats["mem_level"] = "ok"
 43
 44    return stats
 45
 46
 47def _get_trace_stats(observer: Observer) -> dict[str, Any] | None:
 48    """Get trace-level stats with levels and display values."""
 49    stats = observer.get_current_trace_stats()
 50    if stats is None:
 51        return None
 52
 53    query_count = stats["query_count"]
 54    duplicate_count = stats["duplicate_count"]
 55    duration_ms = stats["duration_ms"]
 56
 57    # Any duplicate queries promote to at least "warn" (N+1 indicator)
 58    query_level = _level(query_count, warn=10, danger=30)
 59    if duplicate_count > 0 and query_level == "ok":
 60        query_level = "warn"
 61
 62    # Format duration for display
 63    if duration_ms is not None:
 64        duration_ms_rounded = round(duration_ms)
 65        if duration_ms >= 1000:
 66            duration_display = f"{duration_ms / 1000:.1f}s"
 67        else:
 68            duration_display = f"{duration_ms_rounded}ms"
 69        duration_level = _level(duration_ms_rounded, warn=200, danger=1000)
 70    else:
 71        duration_display = None
 72        duration_level = "ok"
 73
 74    return {
 75        "query_count": query_count,
 76        "duplicate_count": duplicate_count,
 77        "query_level": query_level,
 78        "duration_display": duration_display,
 79        "duration_level": duration_level,
 80    }
 81
 82
 83@register_toolbar_item
 84class ObserverToolbarItem(ToolbarItem):
 85    name = "Observer"
 86    panel_template_name = "toolbar/observer.html"
 87    button_template_name = "toolbar/observer_button.html"
 88
 89    @cached_property
 90    def observer(self) -> Observer:
 91        """Get the Observer instance for this request."""
 92        return Observer.from_request(self.request)
 93
 94    @cached_property
 95    def _context(self) -> dict[str, Any]:
 96        context = super().get_template_context()
 97        context["observer"] = self.observer
 98        context["system_stats"] = _get_system_stats()
 99        context["trace_stats"] = _get_trace_stats(self.observer)
100        return context
101
102    def get_template_context(self) -> dict[str, Any]:
103        return self._context