v0.142.0
  1from __future__ import annotations
  2
  3import os
  4import re
  5from typing import Any
  6
  7import psycopg.errors
  8
  9from .types import PgssAvailability
 10
 11
 12def _format_bytes(nbytes: int) -> str:
 13    value = float(nbytes)
 14    for unit in ("B", "kB", "MB", "GB", "TB"):
 15        if abs(value) < 1024:
 16            if unit == "B":
 17                return f"{int(value)} {unit}"
 18            return f"{value:.1f} {unit}"
 19        value /= 1024
 20    return f"{value:.1f} PB"
 21
 22
 23def _display_path(path: str) -> str:
 24    """Format an absolute source path relative to the Plain project root
 25    when possible, so CLI output matches the project-root-relative paths
 26    users actually see in their editor — and stays stable no matter where
 27    the CLI is invoked from. Falls back to cwd-relative, then to absolute
 28    when neither form is useful (different drive, or path is outside the
 29    project)."""
 30    if not path:
 31        return path
 32
 33    # Prefer Plain's project root (APP_PATH.parent) so output doesn't drift
 34    # based on the invoking shell's cwd. Imported lazily to avoid a cycle
 35    # and to tolerate contexts where runtime isn't set up.
 36    bases: list[str] = []
 37    try:
 38        from plain.runtime import APP_PATH
 39
 40        bases.append(str(APP_PATH.parent))
 41    except Exception:
 42        pass
 43    bases.append(os.getcwd())
 44
 45    for base in bases:
 46        try:
 47            rel = os.path.relpath(path, base)
 48        except ValueError:
 49            continue
 50        if not rel.startswith(".."):
 51            return rel
 52
 53    return path
 54
 55
 56def _index_suggestion(
 57    *,
 58    source: str,
 59    package: str,
 60    model_class: str = "",
 61    model_file: str = "",
 62    app_suggestion: str,
 63    unmanaged_suggestion: str,
 64) -> str:
 65    """Return the appropriate suggestion based on table ownership.
 66
 67    When model_class and model_file are known, prefixes the app_suggestion
 68    with a concrete file:class pointer so agents and humans can jump
 69    directly to the code.
 70    """
 71    if source == "app":
 72        if model_class and model_file:
 73            return f"{_display_path(model_file)} :: {model_class}{app_suggestion}"
 74        if model_class:
 75            return f"On model {model_class}: {app_suggestion}"
 76        return app_suggestion
 77    elif source == "package":
 78        return f"Managed by {package} — not directly actionable in your app"
 79    return unmanaged_suggestion
 80
 81
 82def _pgss_usable(cursor: Any) -> PgssAvailability:
 83    """Check pg_stat_statements availability for this role.
 84
 85    Returns three states so callers can surface the right remediation —
 86    "install the extension" vs "grant pg_read_all_stats" are completely
 87    different fixes, and conflating them sends users down the wrong path.
 88
 89    Probes inside psycopg's `transaction()` so a permission-denied error
 90    (common on managed Postgres where the extension is installed but
 91    only readable by admin roles) rolls back cleanly — whether the outer
 92    connection is in autocommit mode or inside a transaction — without
 93    cascade-failing every later check.
 94    """
 95    cursor.execute(
 96        "SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'"
 97    )
 98    if not cursor.fetchone():
 99        return "not_installed"
100    try:
101        with cursor.connection.transaction():
102            cursor.execute("SELECT 1 FROM pg_stat_statements LIMIT 1")
103    except psycopg.errors.DatabaseError:
104        return "no_permission"
105    return "usable"
106
107
108def _top_queries_for_table(
109    cursor: Any, table_name: str, limit: int = 3
110) -> list[dict[str, Any]]:
111    """Pull top queries against a table from pg_stat_statements.
112
113    Caller must confirm pg_stat_statements is usable (via ``_pgss_usable``)
114    before calling.
115
116    Matches on the table name appearing as a whole identifier, not a
117    substring: uses a POSIX regex with word boundaries so `user_profile`
118    doesn't accidentally match queries referencing `user` (SQL LIKE treats
119    `_` as a single-char wildcard, which made the previous ILIKE approach
120    both miss unquoted references and over-match underscored names).
121    """
122    # Escape regex metacharacters in the user-provided name, then build a
123    # pattern that requires a non-identifier character (or string edge) on
124    # either side. Identifier chars per Postgres are [A-Za-z0-9_].
125    escaped = re.escape(table_name)
126    pattern = rf"(^|[^A-Za-z0-9_]){escaped}($|[^A-Za-z0-9_])"
127    # toplevel=true filters out queries executed inside functions/procs
128    # (PG 14+ column, guaranteed on Plain's PG 16+ minimum). Without it,
129    # a hot stored proc's inner SELECT can dominate the "top queries"
130    # list and obscure the real app-level culprits.
131    cursor.execute(
132        """
133        SELECT
134            calls,
135            ROUND(total_exec_time::numeric, 2) AS total_ms,
136            rows,
137            shared_blks_hit + shared_blks_read AS blks_total,
138            LEFT(query, 300) AS query
139        FROM pg_stat_statements
140        WHERE query ~ %(pat)s
141          AND toplevel
142          AND query !~* '^\\s*EXPLAIN\\M'
143        ORDER BY total_exec_time DESC
144        LIMIT %(limit)s
145        """,
146        {"pat": pattern, "limit": limit},
147    )
148
149    out: list[dict[str, Any]] = []
150    for calls, total_ms, rows_returned, blks_total, query in cursor.fetchall():
151        calls = calls or 0
152        blks_total = blks_total or 0
153        rows_returned = rows_returned or 0
154        out.append(
155            {
156                "calls": calls,
157                "total_ms": float(total_ms or 0),
158                "rows_returned": rows_returned,
159                "blks_per_call": round(blks_total / calls, 1) if calls else 0,
160                "rows_per_call": round(rows_returned / calls, 2) if calls else 0,
161                "query": " ".join(query.split()) if query else "",
162            }
163        )
164    return out