1from __future__ import annotations
2
3from functools import cached_property
4from typing import Any
5
6from plain.toolbar import ToolbarItem, register_toolbar_item
7from plain.utils.os import get_memory_usage, get_process_cpu_percent
8
9from .core import Observer
10from .formatting import format_bytes
11
12
13def _level(value: int | float, warn: int, danger: int) -> str:
14 if value >= danger:
15 return "danger"
16 if value >= warn:
17 return "warn"
18 return "ok"
19
20
21def _get_system_stats() -> dict[str, Any]:
22 """Get system-level CPU and memory stats."""
23 cpu_percent = get_process_cpu_percent()
24 usage_bytes, limit_bytes = get_memory_usage()
25
26 stats: dict[str, Any] = {}
27
28 if cpu_percent is not None:
29 stats["cpu_percent"] = cpu_percent
30 stats["cpu_level"] = _level(cpu_percent, warn=50, danger=80)
31
32 if limit_bytes is not None:
33 mem_percent = round(usage_bytes / limit_bytes * 100)
34 stats["mem_display"] = f"{mem_percent}%"
35 stats["mem_title"] = (
36 f"Container memory: {format_bytes(usage_bytes)} / {format_bytes(limit_bytes)}"
37 )
38 stats["mem_level"] = _level(mem_percent, warn=70, danger=90)
39 else:
40 stats["mem_display"] = format_bytes(usage_bytes, precision=0)
41 stats["mem_title"] = "Server process RSS"
42 stats["mem_level"] = "ok"
43
44 return stats
45
46
47def _get_trace_stats(observer: Observer) -> dict[str, Any] | None:
48 """Get trace-level stats with levels and display values."""
49 stats = observer.get_current_trace_stats()
50 if stats is None:
51 return None
52
53 query_count = stats["query_count"]
54 duplicate_count = stats["duplicate_count"]
55 duration_ms = stats["duration_ms"]
56
57 # Any duplicate queries promote to at least "warn" (N+1 indicator)
58 query_level = _level(query_count, warn=10, danger=30)
59 if duplicate_count > 0 and query_level == "ok":
60 query_level = "warn"
61
62 # Format duration for display
63 if duration_ms is not None:
64 duration_ms_rounded = round(duration_ms)
65 if duration_ms >= 1000:
66 duration_display = f"{duration_ms / 1000:.1f}s"
67 else:
68 duration_display = f"{duration_ms_rounded}ms"
69 duration_level = _level(duration_ms_rounded, warn=200, danger=1000)
70 else:
71 duration_display = None
72 duration_level = "ok"
73
74 return {
75 "query_count": query_count,
76 "duplicate_count": duplicate_count,
77 "query_level": query_level,
78 "duration_display": duration_display,
79 "duration_level": duration_level,
80 }
81
82
83@register_toolbar_item
84class ObserverToolbarItem(ToolbarItem):
85 name = "Observer"
86 panel_template_name = "toolbar/observer.html"
87 button_template_name = "toolbar/observer_button.html"
88
89 @cached_property
90 def observer(self) -> Observer:
91 """Get the Observer instance for this request."""
92 return Observer.from_request(self.request)
93
94 @cached_property
95 def _context(self) -> dict[str, Any]:
96 context = super().get_template_context()
97 context["observer"] = self.observer
98 context["system_stats"] = _get_system_stats()
99 context["trace_stats"] = _get_trace_stats(self.observer)
100 return context
101
102 def get_template_context(self) -> dict[str, Any]:
103 return self._context