1from __future__ import annotations
2
3import os
4import resource
5import sys
6import threading
7import time
8from functools import lru_cache
9
10
11@lru_cache(maxsize=1)
12def _get_cgroup_dir() -> str:
13 """Resolve the process's own cgroup directory for cgroup v2.
14
15 Cached because the cgroup path is static for the lifetime of the process.
16 """
17 cgroup_dir = "/sys/fs/cgroup"
18 try:
19 with open("/proc/self/cgroup") as f:
20 for line in f:
21 # cgroup v2 entries have the form "0::<path>"
22 parts = line.strip().split(":", 2)
23 if (
24 len(parts) == 3
25 and parts[0] == "0"
26 and parts[1] == ""
27 and parts[2] != "/"
28 ):
29 cgroup_dir = f"/sys/fs/cgroup{parts[2]}"
30 break
31 except (FileNotFoundError, IndexError, OSError):
32 pass
33 return cgroup_dir
34
35
36def get_rss_bytes() -> int:
37 """Get the current process's RSS in bytes.
38
39 Uses /proc/self/statm on Linux for current RSS,
40 falls back to ru_maxrss (peak RSS) on macOS.
41 """
42 if sys.platform == "linux":
43 try:
44 with open("/proc/self/statm") as f:
45 resident_pages = int(f.read().split()[1])
46 return resident_pages * os.sysconf("SC_PAGE_SIZE")
47 except (OSError, ValueError, IndexError):
48 pass
49
50 # macOS / fallback: ru_maxrss is peak, not current
51 rusage = resource.getrusage(resource.RUSAGE_SELF)
52 if sys.platform == "darwin":
53 return rusage.ru_maxrss # bytes on macOS
54 return rusage.ru_maxrss * 1024 # KB on Linux (fallback)
55
56
57def get_cpu_count() -> int:
58 """Get the number of CPUs available, respecting cgroup limits in containers.
59
60 os.process_cpu_count() only checks sched_getaffinity, not cgroup CPU quotas,
61 so containers often see the host's full CPU count. This reads the cgroup v2
62 quota file to detect the actual limit.
63
64 Can be removed when minimum Python version is 3.14+ (cpython#120078).
65 """
66 cpu_count = os.process_cpu_count() or 1
67
68 cgroup_dir = _get_cgroup_dir()
69
70 # Check cgroup v2 CPU quota (Docker, Kubernetes, Railway, etc.)
71 try:
72 with open(f"{cgroup_dir}/cpu.max") as f:
73 parts = f.read().strip().split()
74 if len(parts) >= 2 and parts[0] != "max":
75 quota = int(parts[0])
76 period = int(parts[1])
77 cgroup_cpus = max(1, -(-quota // period)) # ceiling division
78 cpu_count = min(cpu_count, cgroup_cpus)
79 except (FileNotFoundError, ValueError, OSError):
80 pass
81
82 # Check cgroup v1 CPU quota (Heroku Cedar, older Docker, etc.)
83 try:
84 with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as f:
85 quota = int(f.read().strip())
86 if quota > 0:
87 with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as f:
88 period = int(f.read().strip())
89 cgroup_cpus = max(1, -(-quota // period))
90 cpu_count = min(cpu_count, cgroup_cpus)
91 except (FileNotFoundError, ValueError, OSError):
92 pass
93
94 return cpu_count
95
96
97def get_memory_usage() -> tuple[int, int | None]:
98 """Get memory usage and optional limit in bytes, container-aware.
99
100 Returns (usage_bytes, limit_bytes) for containers with cgroup limits,
101 or (usage_bytes, None) for bare-metal/dev where only process RSS is available.
102
103 Tries cgroup v2 first, then v1, then falls back to process RSS via getrusage.
104 """
105 cgroup_dir = _get_cgroup_dir()
106
107 # cgroup v2
108 try:
109 with open(f"{cgroup_dir}/memory.current") as f:
110 usage = int(f.read().strip())
111 with open(f"{cgroup_dir}/memory.max") as f:
112 content = f.read().strip()
113 limit = None if content == "max" else int(content)
114 return (usage, limit)
115 except (FileNotFoundError, ValueError, OSError):
116 pass
117
118 # cgroup v1
119 try:
120 with open("/sys/fs/cgroup/memory/memory.usage_in_bytes") as f:
121 usage = int(f.read().strip())
122 with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f:
123 limit = int(f.read().strip())
124 # cgroup v1 uses a very large number to mean "unlimited"
125 if limit >= 2**62:
126 limit = None
127 return (usage, limit)
128 except (FileNotFoundError, ValueError, OSError):
129 pass
130
131 # Fallback: process RSS (works on macOS and Linux)
132 return (get_rss_bytes(), None)
133
134
135_cpu_lock = threading.Lock()
136_last_cpu_time: float = 0.0
137_last_wall_time: float = 0.0
138
139
140def get_process_cpu_percent() -> int | None:
141 """Get this process's CPU usage as a percentage of wall-clock time.
142
143 Uses resource.getrusage() which reports actual CPU time consumed,
144 accurate in both containers and on bare metal.
145
146 Returns None on the first call (no baseline to compare against).
147 Subsequent calls return the CPU percent since the previous call.
148 """
149 global _last_cpu_time, _last_wall_time
150
151 usage = resource.getrusage(resource.RUSAGE_SELF)
152 cpu_time = usage.ru_utime + usage.ru_stime
153 wall_time = time.monotonic()
154
155 with _cpu_lock:
156 if _last_wall_time == 0.0:
157 _last_cpu_time = cpu_time
158 _last_wall_time = wall_time
159 return None
160
161 wall_delta = wall_time - _last_wall_time
162 cpu_delta = cpu_time - _last_cpu_time
163
164 _last_cpu_time = cpu_time
165 _last_wall_time = wall_time
166
167 if wall_delta <= 0:
168 return 0
169
170 return min(round(cpu_delta / wall_delta * 100), 100)