1from __future__ import annotations
2
3import os
4import re
5from typing import Any
6
7import psycopg.errors
8
9from .types import PgssAvailability
10
11
12def _format_bytes(nbytes: int) -> str:
13 value = float(nbytes)
14 for unit in ("B", "kB", "MB", "GB", "TB"):
15 if abs(value) < 1024:
16 if unit == "B":
17 return f"{int(value)} {unit}"
18 return f"{value:.1f} {unit}"
19 value /= 1024
20 return f"{value:.1f} PB"
21
22
23def _display_path(path: str) -> str:
24 """Format an absolute source path relative to the Plain project root
25 when possible, so CLI output matches the project-root-relative paths
26 users actually see in their editor — and stays stable no matter where
27 the CLI is invoked from. Falls back to cwd-relative, then to absolute
28 when neither form is useful (different drive, or path is outside the
29 project)."""
30 if not path:
31 return path
32
33 # Prefer Plain's project root (APP_PATH.parent) so output doesn't drift
34 # based on the invoking shell's cwd. Imported lazily to avoid a cycle
35 # and to tolerate contexts where runtime isn't set up.
36 bases: list[str] = []
37 try:
38 from plain.runtime import APP_PATH
39
40 bases.append(str(APP_PATH.parent))
41 except Exception:
42 pass
43 bases.append(os.getcwd())
44
45 for base in bases:
46 try:
47 rel = os.path.relpath(path, base)
48 except ValueError:
49 continue
50 if not rel.startswith(".."):
51 return rel
52
53 return path
54
55
56def _index_suggestion(
57 *,
58 source: str,
59 package: str,
60 model_class: str = "",
61 model_file: str = "",
62 app_suggestion: str,
63 unmanaged_suggestion: str,
64) -> str:
65 """Return the appropriate suggestion based on table ownership.
66
67 When model_class and model_file are known, prefixes the app_suggestion
68 with a concrete file:class pointer so agents and humans can jump
69 directly to the code.
70 """
71 if source == "app":
72 if model_class and model_file:
73 return f"{_display_path(model_file)} :: {model_class} — {app_suggestion}"
74 if model_class:
75 return f"On model {model_class}: {app_suggestion}"
76 return app_suggestion
77 elif source == "package":
78 return f"Managed by {package} — not directly actionable in your app"
79 return unmanaged_suggestion
80
81
82def _pgss_usable(cursor: Any) -> PgssAvailability:
83 """Check pg_stat_statements availability for this role.
84
85 Returns three states so callers can surface the right remediation —
86 "install the extension" vs "grant pg_read_all_stats" are completely
87 different fixes, and conflating them sends users down the wrong path.
88
89 Probes inside psycopg's `transaction()` so a permission-denied error
90 (common on managed Postgres where the extension is installed but
91 only readable by admin roles) rolls back cleanly — whether the outer
92 connection is in autocommit mode or inside a transaction — without
93 cascade-failing every later check.
94 """
95 cursor.execute(
96 "SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'"
97 )
98 if not cursor.fetchone():
99 return "not_installed"
100 try:
101 with cursor.connection.transaction():
102 cursor.execute("SELECT 1 FROM pg_stat_statements LIMIT 1")
103 except psycopg.errors.DatabaseError:
104 return "no_permission"
105 return "usable"
106
107
108def _top_queries_for_table(
109 cursor: Any, table_name: str, limit: int = 3
110) -> list[dict[str, Any]]:
111 """Pull top queries against a table from pg_stat_statements.
112
113 Caller must confirm pg_stat_statements is usable (via ``_pgss_usable``)
114 before calling.
115
116 Matches on the table name appearing as a whole identifier, not a
117 substring: uses a POSIX regex with word boundaries so `user_profile`
118 doesn't accidentally match queries referencing `user` (SQL LIKE treats
119 `_` as a single-char wildcard, which made the previous ILIKE approach
120 both miss unquoted references and over-match underscored names).
121 """
122 # Escape regex metacharacters in the user-provided name, then build a
123 # pattern that requires a non-identifier character (or string edge) on
124 # either side. Identifier chars per Postgres are [A-Za-z0-9_].
125 escaped = re.escape(table_name)
126 pattern = rf"(^|[^A-Za-z0-9_]){escaped}($|[^A-Za-z0-9_])"
127 # toplevel=true filters out queries executed inside functions/procs
128 # (PG 14+ column, guaranteed on Plain's PG 16+ minimum). Without it,
129 # a hot stored proc's inner SELECT can dominate the "top queries"
130 # list and obscure the real app-level culprits.
131 cursor.execute(
132 """
133 SELECT
134 calls,
135 ROUND(total_exec_time::numeric, 2) AS total_ms,
136 rows,
137 shared_blks_hit + shared_blks_read AS blks_total,
138 LEFT(query, 300) AS query
139 FROM pg_stat_statements
140 WHERE query ~ %(pat)s
141 AND toplevel
142 AND query !~* '^\\s*EXPLAIN\\M'
143 ORDER BY total_exec_time DESC
144 LIMIT %(limit)s
145 """,
146 {"pat": pattern, "limit": limit},
147 )
148
149 out: list[dict[str, Any]] = []
150 for calls, total_ms, rows_returned, blks_total, query in cursor.fetchall():
151 calls = calls or 0
152 blks_total = blks_total or 0
153 rows_returned = rows_returned or 0
154 out.append(
155 {
156 "calls": calls,
157 "total_ms": float(total_ms or 0),
158 "rows_returned": rows_returned,
159 "blks_per_call": round(blks_total / calls, 1) if calls else 0,
160 "rows_per_call": round(rows_returned / calls, 2) if calls else 0,
161 "query": " ".join(query.split()) if query else "",
162 }
163 )
164 return out