v0.142.0
  1"""Point-in-time snapshot checks — look at pg_stat_activity / pg_locks
  2right now, not at accumulated counters. Fire on live incidents
  3(blocker sessions, long-running idle-in-transaction, stuck queries)."""
  4
  5from __future__ import annotations
  6
  7from typing import Any
  8
  9import psycopg.errors
 10
 11from .types import CheckItem, CheckResult, CheckStatus, TableOwner
 12
 13_SEVERITY_RANK: dict[CheckStatus, int] = {
 14    "ok": 0,
 15    "skipped": 0,
 16    "error": 0,
 17    "warning": 1,
 18    "critical": 2,
 19}
 20
 21
 22def _escalate(current: CheckStatus, new: CheckStatus) -> CheckStatus:
 23    """Return whichever of `current` or `new` has the higher severity."""
 24    return new if _SEVERITY_RANK[new] > _SEVERITY_RANK[current] else current
 25
 26
 27def check_blocking_queries(
 28    cursor: Any, table_owners: dict[str, TableOwner]
 29) -> CheckResult:
 30    """Queries currently blocking other queries via held locks.
 31
 32    Point-in-time snapshot using pg_blocking_pids. A blocker is a
 33    transaction holding a lock that one or more other transactions are
 34    waiting on. Classic cause of "my app is hanging" in incidents —
 35    common patterns: long-running migration, idle-in-transaction client
 36    forgotten with a row lock, or an aggregate query holding a share lock.
 37
 38    Fires warning at any blocker with victims aged 30s+; critical at 5min+.
 39    """
 40    warn_age_sec = 30
 41    critical_age_sec = 300
 42
 43    # Probe inside psycopg's `transaction()` so a permission-denied error
 44    # rolls back cleanly (via fresh txn in autocommit mode, savepoint in
 45    # transaction mode) and doesn't cascade-fail later checks.
 46    #
 47    # Wait time is taken from pg_locks.waitstart (PG 14+) — the time the
 48    # blocked pid started waiting on its lock. Using query_start would
 49    # over-report (it measures total query runtime, not lock wait), which
 50    # makes severity triage unreliable for long-running statements that
 51    # only block near the end.
 52    try:
 53        with cursor.connection.transaction():
 54            cursor.execute(
 55                """
 56                SELECT
 57                    blocking.pid AS blocker_pid,
 58                    COALESCE(blocking.application_name, '') AS blocker_app,
 59                    COALESCE(blocking.usename, '') AS blocker_user,
 60                    blocking.state AS blocker_state,
 61                    EXTRACT(EPOCH FROM (now() - blocking.state_change))::bigint
 62                        AS blocker_state_age_sec,
 63                    LEFT(COALESCE(blocking.query, ''), 200) AS blocker_query,
 64                    blocked.pid AS blocked_pid,
 65                    COALESCE(blocked.application_name, '') AS blocked_app,
 66                    EXTRACT(
 67                        EPOCH FROM (now() - COALESCE(bl.wait_started, blocked.state_change))
 68                    )::bigint AS blocked_age_sec,
 69                    LEFT(COALESCE(blocked.query, ''), 200) AS blocked_query
 70                FROM pg_stat_activity AS blocked
 71                JOIN pg_stat_activity AS blocking
 72                    ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
 73                LEFT JOIN LATERAL (
 74                    SELECT MIN(waitstart) AS wait_started
 75                    FROM pg_catalog.pg_locks
 76                    WHERE pid = blocked.pid
 77                      AND NOT granted
 78                      AND waitstart IS NOT NULL
 79                ) bl ON TRUE
 80                WHERE blocked.datname = current_database()
 81                  AND blocking.datname = current_database()
 82                ORDER BY blocking.pid, blocked_age_sec DESC
 83                """
 84            )
 85            rows = cursor.fetchall()
 86    except psycopg.errors.InsufficientPrivilege:
 87        return CheckResult(
 88            name="blocking_queries",
 89            label="Blocking queries",
 90            status="skipped",
 91            summary="insufficient privilege to read pg_stat_activity",
 92            items=[],
 93            message="Grant pg_read_all_stats to this role for this check.",
 94            tier="warning",
 95        )
 96    except psycopg.errors.DatabaseError as e:
 97        # Defensive catch: if pg_stat_activity/pg_locks hit an unexpected
 98        # server-side condition (column rename on a newer major, catalog
 99        # lock conflict, etc.), skip gracefully instead of letting the
100        # failure cascade to run_all_checks.
101        return CheckResult(
102            name="blocking_queries",
103            label="Blocking queries",
104            status="skipped",
105            summary="query failed",
106            items=[],
107            message=f"Blocking-queries probe failed: {e}",
108            tier="warning",
109        )
110
111    # Group by blocker pid.
112    grouped: dict[int, dict[str, Any]] = {}
113    for row in rows:
114        (
115            blocker_pid,
116            blocker_app,
117            blocker_user,
118            blocker_state,
119            blocker_age,
120            blocker_query,
121            blocked_pid,
122            blocked_app,
123            blocked_age,
124            blocked_query,
125        ) = row
126        entry = grouped.setdefault(
127            blocker_pid,
128            {
129                "app": blocker_app,
130                "user": blocker_user,
131                "state": blocker_state,
132                "state_age": blocker_age or 0,
133                "query": (blocker_query or "").strip(),
134                "victims": [],
135            },
136        )
137        entry["victims"].append(
138            {
139                "pid": blocked_pid,
140                "app": blocked_app,
141                "age": blocked_age or 0,
142                "query": (blocked_query or "").strip(),
143            }
144        )
145
146    items: list[CheckItem] = []
147    worst_severity: CheckStatus = "ok"
148    for blocker_pid, info in grouped.items():
149        # Age of the longest-waiting victim — that's how urgent this is.
150        oldest_victim_age = max((v["age"] for v in info["victims"]), default=0)
151
152        if oldest_victim_age >= critical_age_sec:
153            severity: CheckStatus = "critical"
154        elif oldest_victim_age >= warn_age_sec:
155            severity = "warning"
156        else:
157            continue
158
159        worst_severity = _escalate(worst_severity, severity)
160
161        victim_lines = []
162        for v in info["victims"]:
163            victim_lines.append(
164                f"      pid {v['pid']}: waiting {v['age']}s — "
165                f"{(v['query'] or '(no query)')[:160]}"
166            )
167
168        app_tag = f" [{info['app']}]" if info["app"] else ""
169        items.append(
170            CheckItem(
171                table="",
172                name=f"pid {blocker_pid}{app_tag}",
173                detail=(
174                    f"blocking {len(info['victims'])} "
175                    f"{'query' if len(info['victims']) == 1 else 'queries'} "
176                    f"(oldest waiting {oldest_victim_age}s); "
177                    f"blocker state: {info['state']} for {info['state_age']}s; "
178                    f"blocker query: {(info['query'] or '(no query)')[:160]}\n"
179                    + "\n".join(victim_lines)
180                ),
181                source="",
182                package="",
183                model_class="",
184                model_file="",
185                suggestion=(
186                    f"If the blocker is stuck, terminate it: "
187                    f"SELECT pg_terminate_backend({blocker_pid});"
188                ),
189                caveats=[],
190            )
191        )
192
193    if worst_severity == "critical":
194        summary = f"{len(items)} blocker(s) with critical-age waiters"
195    elif items:
196        summary = f"{len(items)} blocker(s)"
197    else:
198        summary = "none"
199
200    return CheckResult(
201        name="blocking_queries",
202        label="Blocking queries",
203        status=worst_severity if items else "ok",
204        summary=summary,
205        items=items,
206        message="",
207        tier="warning",
208    )
209
210
211def check_long_running_connections(
212    cursor: Any, table_owners: dict[str, TableOwner]
213) -> CheckResult:
214    """Client connections stuck in a transaction or running a query for too long.
215
216    Idle-in-transaction holds row locks and blocks autovacuum; prolonged
217    active queries often indicate a bad plan, a migration gone wrong, or an
218    unindexed cleanup job. Excludes this backend and non-client backends
219    (autovacuum workers, walsenders, etc).
220    """
221    idle_warn_seconds = 60
222    idle_critical_seconds = 600  # 10 minutes
223    active_warn_seconds = 300  # 5 minutes
224    active_critical_seconds = 1800  # 30 minutes
225
226    # Probe inside psycopg's `transaction()` so a permission-denied error
227    # rolls back cleanly (via fresh txn in autocommit mode, savepoint in
228    # transaction mode) and doesn't cascade-fail later checks.
229    try:
230        with cursor.connection.transaction():
231            cursor.execute("""
232                SELECT
233                    pid,
234                    COALESCE(application_name, '') AS application_name,
235                    COALESCE(usename, '') AS usename,
236                    state,
237                    EXTRACT(EPOCH FROM (now() - state_change))::bigint AS state_age_sec,
238                    EXTRACT(EPOCH FROM (now() - query_start))::bigint AS query_age_sec,
239                    EXTRACT(EPOCH FROM (now() - xact_start))::bigint AS xact_age_sec,
240                    LEFT(COALESCE(query, ''), 200) AS query
241                FROM pg_catalog.pg_stat_activity
242                WHERE datname = current_database()
243                  AND pid <> pg_backend_pid()
244                  AND backend_type = 'client backend'
245                  AND state IS NOT NULL
246            """)
247            rows = cursor.fetchall()
248    except psycopg.errors.InsufficientPrivilege:
249        return CheckResult(
250            name="long_running_connections",
251            label="Long-running connections",
252            status="skipped",
253            summary="insufficient privilege to read pg_stat_activity",
254            items=[],
255            message="Grant pg_read_all_stats to this role for this check.",
256            tier="warning",
257        )
258
259    items: list[CheckItem] = []
260    worst_severity: CheckStatus = "ok"
261    for pid, app_name, usename, state, state_age, query_age, xact_age, query in rows:
262        state_age = state_age or 0
263        query_age = query_age or 0
264        xact_age = xact_age or 0
265
266        kind: str | None = None
267        age: int = 0
268        severity: CheckStatus = "ok"
269
270        if state in ("idle in transaction", "idle in transaction (aborted)"):
271            # Age the transaction by xact_start, not state_change — what matters
272            # is how long this transaction has been open (holding locks, blocking
273            # autovacuum), not just how long it's been sitting idle. A session
274            # that spent 20 minutes in an open txn and idled 5 seconds ago is
275            # still a 20-minute problem.
276            age = xact_age or state_age
277            if age >= idle_critical_seconds:
278                kind, severity = state, "critical"
279            elif age >= idle_warn_seconds:
280                kind, severity = state, "warning"
281        elif state == "active":
282            age = query_age
283            if age >= active_critical_seconds:
284                kind, severity = "active query", "critical"
285            elif age >= active_warn_seconds:
286                kind, severity = "active query", "warning"
287
288        if kind is None:
289            continue
290
291        worst_severity = _escalate(worst_severity, severity)
292
293        app = f" [{app_name}]" if app_name else ""
294        query_excerpt = query.strip() if query else ""
295        if query_excerpt:
296            query_excerpt = f" query: {query_excerpt}"
297
298        items.append(
299            CheckItem(
300                table="",
301                name=f"pid {pid}{app}",
302                detail=f"{kind} for {age}s (user: {usename}){query_excerpt}",
303                source="",
304                package="",
305                model_class="",
306                model_file="",
307                suggestion=(
308                    "Investigate the client — if stuck, "
309                    f"run: SELECT pg_terminate_backend({pid});"
310                ),
311                caveats=[],
312            )
313        )
314
315    if worst_severity == "critical":
316        summary = f"{len(items)} stuck connections (critical)"
317    elif items:
318        summary = f"{len(items)} long-running connections"
319    else:
320        summary = "all ok"
321
322    return CheckResult(
323        name="long_running_connections",
324        label="Long-running connections",
325        status=worst_severity if items else "ok",
326        summary=summary,
327        items=items,
328        message="",
329        tier="warning",
330    )