1"""Point-in-time snapshot checks — look at pg_stat_activity / pg_locks
2right now, not at accumulated counters. Fire on live incidents
3(blocker sessions, long-running idle-in-transaction, stuck queries)."""
4
5from __future__ import annotations
6
7from typing import Any
8
9import psycopg.errors
10
11from .types import CheckItem, CheckResult, CheckStatus, TableOwner
12
13_SEVERITY_RANK: dict[CheckStatus, int] = {
14 "ok": 0,
15 "skipped": 0,
16 "error": 0,
17 "warning": 1,
18 "critical": 2,
19}
20
21
22def _escalate(current: CheckStatus, new: CheckStatus) -> CheckStatus:
23 """Return whichever of `current` or `new` has the higher severity."""
24 return new if _SEVERITY_RANK[new] > _SEVERITY_RANK[current] else current
25
26
27def check_blocking_queries(
28 cursor: Any, table_owners: dict[str, TableOwner]
29) -> CheckResult:
30 """Queries currently blocking other queries via held locks.
31
32 Point-in-time snapshot using pg_blocking_pids. A blocker is a
33 transaction holding a lock that one or more other transactions are
34 waiting on. Classic cause of "my app is hanging" in incidents —
35 common patterns: long-running migration, idle-in-transaction client
36 forgotten with a row lock, or an aggregate query holding a share lock.
37
38 Fires warning at any blocker with victims aged 30s+; critical at 5min+.
39 """
40 warn_age_sec = 30
41 critical_age_sec = 300
42
43 # Probe inside psycopg's `transaction()` so a permission-denied error
44 # rolls back cleanly (via fresh txn in autocommit mode, savepoint in
45 # transaction mode) and doesn't cascade-fail later checks.
46 #
47 # Wait time is taken from pg_locks.waitstart (PG 14+) — the time the
48 # blocked pid started waiting on its lock. Using query_start would
49 # over-report (it measures total query runtime, not lock wait), which
50 # makes severity triage unreliable for long-running statements that
51 # only block near the end.
52 try:
53 with cursor.connection.transaction():
54 cursor.execute(
55 """
56 SELECT
57 blocking.pid AS blocker_pid,
58 COALESCE(blocking.application_name, '') AS blocker_app,
59 COALESCE(blocking.usename, '') AS blocker_user,
60 blocking.state AS blocker_state,
61 EXTRACT(EPOCH FROM (now() - blocking.state_change))::bigint
62 AS blocker_state_age_sec,
63 LEFT(COALESCE(blocking.query, ''), 200) AS blocker_query,
64 blocked.pid AS blocked_pid,
65 COALESCE(blocked.application_name, '') AS blocked_app,
66 EXTRACT(
67 EPOCH FROM (now() - COALESCE(bl.wait_started, blocked.state_change))
68 )::bigint AS blocked_age_sec,
69 LEFT(COALESCE(blocked.query, ''), 200) AS blocked_query
70 FROM pg_stat_activity AS blocked
71 JOIN pg_stat_activity AS blocking
72 ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
73 LEFT JOIN LATERAL (
74 SELECT MIN(waitstart) AS wait_started
75 FROM pg_catalog.pg_locks
76 WHERE pid = blocked.pid
77 AND NOT granted
78 AND waitstart IS NOT NULL
79 ) bl ON TRUE
80 WHERE blocked.datname = current_database()
81 AND blocking.datname = current_database()
82 ORDER BY blocking.pid, blocked_age_sec DESC
83 """
84 )
85 rows = cursor.fetchall()
86 except psycopg.errors.InsufficientPrivilege:
87 return CheckResult(
88 name="blocking_queries",
89 label="Blocking queries",
90 status="skipped",
91 summary="insufficient privilege to read pg_stat_activity",
92 items=[],
93 message="Grant pg_read_all_stats to this role for this check.",
94 tier="warning",
95 )
96 except psycopg.errors.DatabaseError as e:
97 # Defensive catch: if pg_stat_activity/pg_locks hit an unexpected
98 # server-side condition (column rename on a newer major, catalog
99 # lock conflict, etc.), skip gracefully instead of letting the
100 # failure cascade to run_all_checks.
101 return CheckResult(
102 name="blocking_queries",
103 label="Blocking queries",
104 status="skipped",
105 summary="query failed",
106 items=[],
107 message=f"Blocking-queries probe failed: {e}",
108 tier="warning",
109 )
110
111 # Group by blocker pid.
112 grouped: dict[int, dict[str, Any]] = {}
113 for row in rows:
114 (
115 blocker_pid,
116 blocker_app,
117 blocker_user,
118 blocker_state,
119 blocker_age,
120 blocker_query,
121 blocked_pid,
122 blocked_app,
123 blocked_age,
124 blocked_query,
125 ) = row
126 entry = grouped.setdefault(
127 blocker_pid,
128 {
129 "app": blocker_app,
130 "user": blocker_user,
131 "state": blocker_state,
132 "state_age": blocker_age or 0,
133 "query": (blocker_query or "").strip(),
134 "victims": [],
135 },
136 )
137 entry["victims"].append(
138 {
139 "pid": blocked_pid,
140 "app": blocked_app,
141 "age": blocked_age or 0,
142 "query": (blocked_query or "").strip(),
143 }
144 )
145
146 items: list[CheckItem] = []
147 worst_severity: CheckStatus = "ok"
148 for blocker_pid, info in grouped.items():
149 # Age of the longest-waiting victim — that's how urgent this is.
150 oldest_victim_age = max((v["age"] for v in info["victims"]), default=0)
151
152 if oldest_victim_age >= critical_age_sec:
153 severity: CheckStatus = "critical"
154 elif oldest_victim_age >= warn_age_sec:
155 severity = "warning"
156 else:
157 continue
158
159 worst_severity = _escalate(worst_severity, severity)
160
161 victim_lines = []
162 for v in info["victims"]:
163 victim_lines.append(
164 f" pid {v['pid']}: waiting {v['age']}s — "
165 f"{(v['query'] or '(no query)')[:160]}"
166 )
167
168 app_tag = f" [{info['app']}]" if info["app"] else ""
169 items.append(
170 CheckItem(
171 table="",
172 name=f"pid {blocker_pid}{app_tag}",
173 detail=(
174 f"blocking {len(info['victims'])} "
175 f"{'query' if len(info['victims']) == 1 else 'queries'} "
176 f"(oldest waiting {oldest_victim_age}s); "
177 f"blocker state: {info['state']} for {info['state_age']}s; "
178 f"blocker query: {(info['query'] or '(no query)')[:160]}\n"
179 + "\n".join(victim_lines)
180 ),
181 source="",
182 package="",
183 model_class="",
184 model_file="",
185 suggestion=(
186 f"If the blocker is stuck, terminate it: "
187 f"SELECT pg_terminate_backend({blocker_pid});"
188 ),
189 caveats=[],
190 )
191 )
192
193 if worst_severity == "critical":
194 summary = f"{len(items)} blocker(s) with critical-age waiters"
195 elif items:
196 summary = f"{len(items)} blocker(s)"
197 else:
198 summary = "none"
199
200 return CheckResult(
201 name="blocking_queries",
202 label="Blocking queries",
203 status=worst_severity if items else "ok",
204 summary=summary,
205 items=items,
206 message="",
207 tier="warning",
208 )
209
210
211def check_long_running_connections(
212 cursor: Any, table_owners: dict[str, TableOwner]
213) -> CheckResult:
214 """Client connections stuck in a transaction or running a query for too long.
215
216 Idle-in-transaction holds row locks and blocks autovacuum; prolonged
217 active queries often indicate a bad plan, a migration gone wrong, or an
218 unindexed cleanup job. Excludes this backend and non-client backends
219 (autovacuum workers, walsenders, etc).
220 """
221 idle_warn_seconds = 60
222 idle_critical_seconds = 600 # 10 minutes
223 active_warn_seconds = 300 # 5 minutes
224 active_critical_seconds = 1800 # 30 minutes
225
226 # Probe inside psycopg's `transaction()` so a permission-denied error
227 # rolls back cleanly (via fresh txn in autocommit mode, savepoint in
228 # transaction mode) and doesn't cascade-fail later checks.
229 try:
230 with cursor.connection.transaction():
231 cursor.execute("""
232 SELECT
233 pid,
234 COALESCE(application_name, '') AS application_name,
235 COALESCE(usename, '') AS usename,
236 state,
237 EXTRACT(EPOCH FROM (now() - state_change))::bigint AS state_age_sec,
238 EXTRACT(EPOCH FROM (now() - query_start))::bigint AS query_age_sec,
239 EXTRACT(EPOCH FROM (now() - xact_start))::bigint AS xact_age_sec,
240 LEFT(COALESCE(query, ''), 200) AS query
241 FROM pg_catalog.pg_stat_activity
242 WHERE datname = current_database()
243 AND pid <> pg_backend_pid()
244 AND backend_type = 'client backend'
245 AND state IS NOT NULL
246 """)
247 rows = cursor.fetchall()
248 except psycopg.errors.InsufficientPrivilege:
249 return CheckResult(
250 name="long_running_connections",
251 label="Long-running connections",
252 status="skipped",
253 summary="insufficient privilege to read pg_stat_activity",
254 items=[],
255 message="Grant pg_read_all_stats to this role for this check.",
256 tier="warning",
257 )
258
259 items: list[CheckItem] = []
260 worst_severity: CheckStatus = "ok"
261 for pid, app_name, usename, state, state_age, query_age, xact_age, query in rows:
262 state_age = state_age or 0
263 query_age = query_age or 0
264 xact_age = xact_age or 0
265
266 kind: str | None = None
267 age: int = 0
268 severity: CheckStatus = "ok"
269
270 if state in ("idle in transaction", "idle in transaction (aborted)"):
271 # Age the transaction by xact_start, not state_change — what matters
272 # is how long this transaction has been open (holding locks, blocking
273 # autovacuum), not just how long it's been sitting idle. A session
274 # that spent 20 minutes in an open txn and idled 5 seconds ago is
275 # still a 20-minute problem.
276 age = xact_age or state_age
277 if age >= idle_critical_seconds:
278 kind, severity = state, "critical"
279 elif age >= idle_warn_seconds:
280 kind, severity = state, "warning"
281 elif state == "active":
282 age = query_age
283 if age >= active_critical_seconds:
284 kind, severity = "active query", "critical"
285 elif age >= active_warn_seconds:
286 kind, severity = "active query", "warning"
287
288 if kind is None:
289 continue
290
291 worst_severity = _escalate(worst_severity, severity)
292
293 app = f" [{app_name}]" if app_name else ""
294 query_excerpt = query.strip() if query else ""
295 if query_excerpt:
296 query_excerpt = f" query: {query_excerpt}"
297
298 items.append(
299 CheckItem(
300 table="",
301 name=f"pid {pid}{app}",
302 detail=f"{kind} for {age}s (user: {usename}){query_excerpt}",
303 source="",
304 package="",
305 model_class="",
306 model_file="",
307 suggestion=(
308 "Investigate the client — if stuck, "
309 f"run: SELECT pg_terminate_backend({pid});"
310 ),
311 caveats=[],
312 )
313 )
314
315 if worst_severity == "critical":
316 summary = f"{len(items)} stuck connections (critical)"
317 elif items:
318 summary = f"{len(items)} long-running connections"
319 else:
320 summary = "all ok"
321
322 return CheckResult(
323 name="long_running_connections",
324 label="Long-running connections",
325 status=worst_severity if items else "ok",
326 summary=summary,
327 items=items,
328 message="",
329 tier="warning",
330 )