1from __future__ import annotations
2
3from typing import Any, TypedDict
4
5import psycopg.errors
6
7# Types
8
9
10class TableOwner(TypedDict):
11 package_label: str
12 source: str # "app" | "package"
13
14
15class CheckItem(TypedDict):
16 table: str
17 name: str
18 detail: str
19 source: str # "app" | "package" | ""
20 package: str # package label or ""
21 suggestion: str
22
23
24class CheckResult(TypedDict):
25 name: str
26 label: str
27 status: str # "ok" | "warning" | "critical" | "skipped" | "error"
28 summary: str
29 items: list[CheckItem]
30 message: str
31
32
33# Table ownership
34
35
36def build_table_owners() -> dict[str, TableOwner]:
37 """Map table names to their owning package and source (app vs dependency)."""
38 from plain.packages import packages_registry
39 from plain.postgres import models_registry
40
41 owners: dict[str, TableOwner] = {}
42 for package_config in packages_registry.get_package_configs():
43 source = "app" if package_config.name.startswith("app.") else "package"
44 for model in models_registry.get_models(
45 package_label=package_config.package_label
46 ):
47 owners[model.model_options.db_table] = TableOwner(
48 package_label=package_config.package_label,
49 source=source,
50 )
51 for field in model._model_meta.local_many_to_many:
52 owners[field.m2m_db_table()] = TableOwner(
53 package_label=package_config.package_label,
54 source=source,
55 )
56 return owners
57
58
59def _table_source(
60 table_name: str, table_owners: dict[str, TableOwner]
61) -> tuple[str, str]:
62 """Return (source, package) for a table name."""
63 owner = table_owners.get(table_name)
64 if owner:
65 return owner["source"], owner["package_label"]
66 return "", ""
67
68
69# Context
70
71
72def gather_context(cursor: Any, table_owners: dict[str, TableOwner]) -> dict[str, Any]:
73 """Collect contextual information that isn't pass/fail but helps interpretation."""
74 context: dict[str, Any] = {}
75
76 # Table sizes
77 cursor.execute("""
78 SELECT
79 c.relname AS table_name,
80 c.reltuples::bigint AS estimated_row_count,
81 pg_total_relation_size(c.oid) AS total_size_bytes,
82 pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
83 pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
84 pg_size_pretty(pg_indexes_size(c.oid)) AS indexes_size,
85 (SELECT count(*) FROM pg_catalog.pg_index i WHERE i.indrelid = c.oid) AS index_count
86 FROM pg_catalog.pg_class c
87 JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
88 WHERE c.relkind IN ('r', 'p')
89 AND n.nspname = 'public'
90 ORDER BY total_size_bytes DESC
91 """)
92 context["tables"] = []
93 for row in cursor.fetchall():
94 source, package = _table_source(row[0], table_owners)
95 context["tables"].append(
96 {
97 "table": row[0],
98 "estimated_rows": max(row[1], 0),
99 "total_size_bytes": row[2],
100 "total_size": row[3],
101 "table_size": row[4],
102 "indexes_size": row[5],
103 "index_count": row[6],
104 "source": source,
105 "package": package,
106 }
107 )
108
109 # Connection usage
110 cursor.execute("""
111 SELECT
112 (SELECT count(*) FROM pg_catalog.pg_stat_activity
113 WHERE datname = current_database()) AS active_connections,
114 (SELECT setting::int FROM pg_catalog.pg_settings
115 WHERE name = 'max_connections') AS max_connections
116 """)
117 row = cursor.fetchone()
118 context["connections"] = {
119 "active": row[0],
120 "max": row[1],
121 }
122
123 # Stats reset time
124 cursor.execute("""
125 SELECT stats_reset
126 FROM pg_catalog.pg_stat_database
127 WHERE datname = current_database()
128 """)
129 row = cursor.fetchone()
130 if row and row[0]:
131 context["stats_reset"] = row[0].isoformat()
132 else:
133 context["stats_reset"] = None
134
135 # pg_stat_statements availability + slow queries
136 cursor.execute("""
137 SELECT EXISTS (
138 SELECT 1 FROM pg_catalog.pg_extension WHERE extname = 'pg_stat_statements'
139 )
140 """)
141 has_pgss = cursor.fetchone()[0]
142
143 if not has_pgss:
144 context["pg_stat_statements"] = "not_installed"
145 context["slow_queries"] = []
146 else:
147 try:
148 cursor.execute("""
149 SELECT
150 calls,
151 ROUND(total_exec_time::numeric, 2) AS total_time_ms,
152 ROUND(mean_exec_time::numeric, 2) AS mean_time_ms,
153 ROUND(
154 (100 * total_exec_time / NULLIF(SUM(total_exec_time) OVER (), 0))::numeric, 2
155 ) AS pct_total_time,
156 LEFT(query, 200) AS query
157 FROM pg_stat_statements
158 ORDER BY total_exec_time DESC
159 LIMIT 10
160 """)
161 context["pg_stat_statements"] = "available"
162 context["slow_queries"] = [
163 {
164 "calls": row[0],
165 "total_time_ms": float(row[1]),
166 "mean_time_ms": float(row[2]),
167 "pct_total_time": float(row[3]),
168 "query": row[4],
169 }
170 for row in cursor.fetchall()
171 ]
172 except psycopg.errors.DatabaseError:
173 context["pg_stat_statements"] = "no_permission"
174 context["slow_queries"] = []
175
176 return context
177
178
179# Checks
180
181
182def _format_bytes(nbytes: int) -> str:
183 value = float(nbytes)
184 for unit in ("B", "kB", "MB", "GB", "TB"):
185 if abs(value) < 1024:
186 if unit == "B":
187 return f"{int(value)} {unit}"
188 return f"{value:.1f} {unit}"
189 value /= 1024
190 return f"{value:.1f} PB"
191
192
193def _index_suggestion(
194 *,
195 source: str,
196 package: str,
197 app_suggestion: str,
198 unmanaged_suggestion: str,
199) -> str:
200 """Return the appropriate suggestion based on table ownership."""
201 if source == "app":
202 return app_suggestion
203 elif source == "package":
204 return f"Managed by {package} — not directly actionable in your app"
205 return unmanaged_suggestion
206
207
208def check_invalid_indexes(
209 cursor: Any, table_owners: dict[str, TableOwner]
210) -> CheckResult:
211 """Indexes from failed CREATE INDEX CONCURRENTLY — maintained on writes, never used for reads."""
212 cursor.execute("""
213 SELECT
214 s.relname AS table_name,
215 s.indexrelname AS index_name,
216 pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size
217 FROM pg_catalog.pg_stat_user_indexes s
218 JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
219 WHERE NOT i.indisvalid
220 """)
221 rows = cursor.fetchall()
222
223 items: list[CheckItem] = []
224 for table_name, index_name, index_size in rows:
225 source, package = _table_source(table_name, table_owners)
226 items.append(
227 CheckItem(
228 table=table_name,
229 name=index_name,
230 detail=index_size,
231 source=source,
232 package=package,
233 suggestion=_index_suggestion(
234 source=source,
235 package=package,
236 app_suggestion=f'Drop and re-run the migration that created it: DROP INDEX CONCURRENTLY "{index_name}";',
237 unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
238 ),
239 )
240 )
241
242 return CheckResult(
243 name="invalid_indexes",
244 label="Invalid indexes",
245 status="warning" if items else "ok",
246 summary=str(len(items)) if items else "none",
247 items=items,
248 message="",
249 )
250
251
252def check_duplicate_indexes(
253 cursor: Any, table_owners: dict[str, TableOwner]
254) -> CheckResult:
255 """Indexes where one is a column-prefix of another on the same table."""
256 cursor.execute("""
257 SELECT
258 ct.relname AS table_name,
259 ci.relname AS index_name,
260 i.indkey::int[] AS column_numbers,
261 i.indclass::int[] AS opclass_numbers,
262 i.indisunique,
263 pg_size_pretty(pg_relation_size(ci.oid)) AS index_size,
264 pg_relation_size(ci.oid) AS index_size_bytes
265 FROM pg_catalog.pg_index i
266 JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid
267 JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid
268 JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
269 WHERE n.nspname = 'public'
270 AND i.indisvalid
271 AND i.indexprs IS NULL
272 AND i.indpred IS NULL
273 ORDER BY ct.relname, ci.relname
274 """)
275 rows = cursor.fetchall()
276
277 # Group by table
278 by_table: dict[str, list[tuple[str, list[int], list[int], bool, str, int]]] = {}
279 for table_name, index_name, cols, opclasses, is_unique, size, size_bytes in rows:
280 by_table.setdefault(table_name, []).append(
281 (index_name, cols, opclasses, is_unique, size, size_bytes)
282 )
283
284 items: list[CheckItem] = []
285 flagged: set[str] = set() # avoid reporting the same index multiple times
286 for table_name, indexes in by_table.items():
287 for i, idx_a in enumerate(indexes):
288 for idx_b in indexes[i + 1 :]:
289 # Check both directions: is either a prefix of the other?
290 for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
291 name_s, cols_s, ops_s, unique_s, size_s, _ = shorter
292 name_l, cols_l, ops_l, _, _, _ = longer
293 if (
294 name_s not in flagged
295 and len(cols_s) < len(cols_l)
296 and cols_l[: len(cols_s)] == cols_s
297 and ops_l[: len(cols_s)] == ops_s
298 and not unique_s # unique indexes serve a constraint purpose
299 ):
300 source, package = _table_source(table_name, table_owners)
301 app_suggestion = f'Remove "{name_s}" from model indexes/constraints, then run plain postgres sync'
302
303 items.append(
304 CheckItem(
305 table=table_name,
306 name=name_s,
307 detail=f"{size_s}, redundant with {name_l}",
308 source=source,
309 package=package,
310 suggestion=_index_suggestion(
311 source=source,
312 package=package,
313 app_suggestion=app_suggestion,
314 unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{name_s}";',
315 ),
316 )
317 )
318 flagged.add(name_s)
319
320 return CheckResult(
321 name="duplicate_indexes",
322 label="Duplicate indexes",
323 status="warning" if items else "ok",
324 summary=str(len(items)) if items else "none",
325 items=items,
326 message="",
327 )
328
329
330def check_unused_indexes(
331 cursor: Any, table_owners: dict[str, TableOwner]
332) -> CheckResult:
333 """Indexes with zero scans since stats reset, excluding unique/expression/constraint-backing.
334
335 Also excludes indexes that are the sole coverage for a FK column — even at
336 0 scans, FK columns should always have index coverage for referential
337 integrity enforcement (parent DELETE/UPDATE scans the child table).
338 """
339 cursor.execute("""
340 SELECT
341 s.relname AS table_name,
342 s.indexrelname AS index_name,
343 pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size,
344 pg_relation_size(s.indexrelid) AS index_size_bytes
345 FROM pg_catalog.pg_stat_user_indexes s
346 JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
347 WHERE s.idx_scan = 0
348 AND pg_relation_size(s.indexrelid) > 1048576
349 AND 0 <> ALL (i.indkey)
350 AND NOT i.indisunique
351 AND NOT EXISTS (
352 SELECT 1 FROM pg_catalog.pg_constraint c
353 WHERE c.conindid = s.indexrelid
354 )
355 AND i.indisvalid
356 AND NOT (
357 -- Leading column is a FK column on this table
358 EXISTS (
359 SELECT 1 FROM pg_catalog.pg_constraint fk
360 WHERE fk.conrelid = i.indrelid
361 AND fk.contype = 'f'
362 AND array_length(fk.conkey, 1) = 1
363 AND fk.conkey[1] = i.indkey[0]
364 )
365 -- And no other valid index covers that column as its leading column
366 AND NOT EXISTS (
367 SELECT 1 FROM pg_catalog.pg_index other
368 WHERE other.indrelid = i.indrelid
369 AND other.indexrelid != i.indexrelid
370 AND other.indisvalid
371 AND other.indkey[0] = i.indkey[0]
372 )
373 )
374 ORDER BY pg_relation_size(s.indexrelid) DESC
375 """)
376 rows = cursor.fetchall()
377
378 total_bytes = 0
379 items: list[CheckItem] = []
380 for table_name, index_name, index_size, index_size_bytes in rows:
381 total_bytes += index_size_bytes
382 source, package = _table_source(table_name, table_owners)
383 items.append(
384 CheckItem(
385 table=table_name,
386 name=index_name,
387 detail=f"{index_size}, 0 scans",
388 source=source,
389 package=package,
390 suggestion=_index_suggestion(
391 source=source,
392 package=package,
393 app_suggestion=f'Remove "{index_name}" from model indexes/constraints, then run plain postgres sync',
394 unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
395 ),
396 )
397 )
398
399 if items:
400 summary = f"{len(items)} ({_format_bytes(total_bytes)})"
401 else:
402 summary = "none"
403
404 return CheckResult(
405 name="unused_indexes",
406 label="Unused indexes",
407 status="warning" if items else "ok",
408 summary=summary,
409 items=items,
410 message="",
411 )
412
413
414def check_missing_fk_indexes(
415 cursor: Any, table_owners: dict[str, TableOwner]
416) -> CheckResult:
417 """Foreign key columns without a leading index — JOINs on these do sequential scans."""
418 cursor.execute("""
419 SELECT
420 ct.relname AS table_name,
421 a.attname AS column_name,
422 cc.relname AS referenced_table,
423 c.conname AS constraint_name
424 FROM pg_catalog.pg_constraint c
425 JOIN pg_catalog.pg_class ct ON ct.oid = c.conrelid
426 JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
427 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = c.conkey[1]
428 JOIN pg_catalog.pg_class cc ON cc.oid = c.confrelid
429 WHERE c.contype = 'f'
430 AND array_length(c.conkey, 1) = 1
431 AND n.nspname = 'public'
432 AND NOT EXISTS (
433 SELECT 1
434 FROM pg_catalog.pg_index i
435 WHERE i.indrelid = c.conrelid
436 AND i.indkey[0] = c.conkey[1]
437 )
438 ORDER BY ct.relname, a.attname
439 """)
440 rows = cursor.fetchall()
441
442 items: list[CheckItem] = []
443 for table_name, column_name, referenced_table, constraint_name in rows:
444 source, package = _table_source(table_name, table_owners)
445 items.append(
446 CheckItem(
447 table=table_name,
448 name=f"{table_name}.{column_name}",
449 detail=f"references {referenced_table}",
450 source=source,
451 package=package,
452 suggestion=_index_suggestion(
453 source=source,
454 package=package,
455 app_suggestion=f'Add an Index on ["{column_name}"] to the model, then run plain postgres sync',
456 unmanaged_suggestion=f'CREATE INDEX CONCURRENTLY ON "{table_name}" ("{column_name}");',
457 ),
458 )
459 )
460
461 return CheckResult(
462 name="missing_fk_indexes",
463 label="Missing FK indexes",
464 status="warning" if items else "ok",
465 summary=str(len(items)) if items else "none",
466 items=items,
467 message="",
468 )
469
470
471def check_sequence_exhaustion(
472 cursor: Any, table_owners: dict[str, TableOwner]
473) -> CheckResult:
474 """Identity sequences approaching their type max."""
475 cursor.execute("""
476 WITH sequences AS (
477 SELECT
478 s.seqrelid,
479 s.seqtypid,
480 s.seqmax,
481 ps.last_value,
482 ps.start_value
483 FROM pg_catalog.pg_sequence s
484 JOIN pg_catalog.pg_class c ON c.oid = s.seqrelid
485 JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
486 JOIN pg_sequences ps ON ps.schemaname = n.nspname
487 AND ps.sequencename = c.relname
488 )
489 SELECT
490 d.refobjid::regclass AS table_name,
491 a.attname AS column_name,
492 seq.seqtypid::regtype AS data_type,
493 COALESCE(seq.last_value, seq.start_value) AS current_value,
494 seq.seqmax AS max_value,
495 ROUND(
496 100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
497 ) AS pct_used
498 FROM sequences seq
499 JOIN pg_catalog.pg_depend d ON d.objid = seq.seqrelid
500 AND d.deptype IN ('a', 'i')
501 AND d.classid = 'pg_class'::regclass
502 AND d.refclassid = 'pg_class'::regclass
503 JOIN pg_catalog.pg_attribute a ON a.attrelid = d.refobjid
504 AND a.attnum = d.refobjsubid
505 WHERE ROUND(
506 100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
507 ) > 50
508 ORDER BY pct_used DESC
509 """)
510 rows = cursor.fetchall()
511
512 items: list[CheckItem] = []
513 worst_pct = 0.0
514 for table_name, column_name, data_type, current_value, max_value, pct_used in rows:
515 pct = float(pct_used)
516 worst_pct = max(worst_pct, pct)
517 table_str = str(table_name)
518 source, package = _table_source(table_str, table_owners)
519 items.append(
520 CheckItem(
521 table=table_str,
522 name=f"{table_str}.{column_name}",
523 detail=f"{data_type}, {pct_used}% used ({current_value:,} / {max_value:,})",
524 source=source,
525 package=package,
526 suggestion=f'ALTER TABLE "{table_str}" ALTER COLUMN "{column_name}" SET DATA TYPE bigint;',
527 )
528 )
529
530 if worst_pct >= 90:
531 status = "critical"
532 elif items:
533 status = "warning"
534 else:
535 status = "ok"
536
537 return CheckResult(
538 name="sequence_exhaustion",
539 label="Sequence exhaustion",
540 status=status,
541 summary=f"{worst_pct}% worst" if items else "all ok",
542 items=items,
543 message="",
544 )
545
546
547def check_xid_wraparound(
548 cursor: Any, table_owners: dict[str, TableOwner]
549) -> CheckResult:
550 """Transaction ID age approaching the 2B wraparound limit."""
551 cursor.execute("""
552 SELECT
553 datname,
554 age(datfrozenxid) AS xid_age,
555 ROUND(100.0 * age(datfrozenxid) / 2147483648, 2) AS pct_towards_wraparound
556 FROM pg_catalog.pg_database
557 WHERE datallowconn
558 ORDER BY age(datfrozenxid) DESC
559 """)
560 rows = cursor.fetchall()
561
562 items: list[CheckItem] = []
563 worst_pct = 0.0
564 for datname, xid_age, pct in rows:
565 pct_float = float(pct)
566 if pct_float > 25:
567 worst_pct = max(worst_pct, pct_float)
568 items.append(
569 CheckItem(
570 table="",
571 name=datname,
572 detail=f"{pct}% towards wraparound ({xid_age:,} XIDs)",
573 source="",
574 package="",
575 suggestion="Investigate autovacuum health, consider VACUUM FREEZE",
576 )
577 )
578
579 if worst_pct >= 40:
580 status = "critical"
581 elif items:
582 status = "warning"
583 else:
584 status = "ok"
585
586 # Show the current database's percentage even when ok
587 current_pct = float(rows[0][2]) if rows else 0
588 summary = f"{current_pct}%" if not items else f"{worst_pct}%"
589
590 return CheckResult(
591 name="xid_wraparound",
592 label="XID wraparound",
593 status=status,
594 summary=summary,
595 items=items,
596 message="",
597 )
598
599
600def _check_hit_ratio(
601 cursor: Any,
602 *,
603 name: str,
604 label: str,
605 catalog_table: str,
606 hit_col: str,
607 read_col: str,
608) -> CheckResult:
609 """Hit ratio check — below 98.5% indicates insufficient shared_buffers or RAM."""
610 cursor.execute(f"""
611 SELECT ROUND(
612 100.0 * SUM({hit_col}) / NULLIF(SUM({hit_col}) + SUM({read_col}), 0), 2
613 ) FROM {catalog_table}
614 """)
615 row = cursor.fetchone()
616 ratio = float(row[0]) if row and row[0] is not None else 100.0
617
618 return CheckResult(
619 name=name,
620 label=label,
621 status="warning" if ratio < 98.5 else "ok",
622 summary=f"{ratio}%",
623 items=[],
624 message="Consider increasing shared_buffers or adding more RAM"
625 if ratio < 98.5
626 else "",
627 )
628
629
630def check_cache_hit_ratio(
631 cursor: Any, table_owners: dict[str, TableOwner]
632) -> CheckResult:
633 return _check_hit_ratio(
634 cursor,
635 name="cache_hit_ratio",
636 label="Cache hit ratio",
637 catalog_table="pg_catalog.pg_statio_user_tables",
638 hit_col="heap_blks_hit",
639 read_col="heap_blks_read",
640 )
641
642
643def check_index_hit_ratio(
644 cursor: Any, table_owners: dict[str, TableOwner]
645) -> CheckResult:
646 return _check_hit_ratio(
647 cursor,
648 name="index_hit_ratio",
649 label="Index hit ratio",
650 catalog_table="pg_catalog.pg_statio_user_indexes",
651 hit_col="idx_blks_hit",
652 read_col="idx_blks_read",
653 )
654
655
656def check_vacuum_health(
657 cursor: Any, table_owners: dict[str, TableOwner]
658) -> CheckResult:
659 """Tables with significant dead tuple accumulation."""
660 cursor.execute("""
661 SELECT
662 relname AS table_name,
663 n_dead_tup,
664 n_live_tup,
665 CASE WHEN n_live_tup > 0
666 THEN ROUND(100.0 * n_dead_tup / n_live_tup, 2)
667 ELSE 0
668 END AS dead_tuple_pct,
669 last_autovacuum
670 FROM pg_catalog.pg_stat_user_tables
671 WHERE n_dead_tup > 1000
672 ORDER BY n_dead_tup DESC
673 """)
674 rows = cursor.fetchall()
675
676 items: list[CheckItem] = []
677 for table_name, n_dead, n_live, dead_pct, last_vacuum in rows:
678 pct = float(dead_pct)
679 if pct > 10:
680 vacuum_info = str(last_vacuum)[:19] if last_vacuum else "never"
681 source, package = _table_source(table_name, table_owners)
682 items.append(
683 CheckItem(
684 table=table_name,
685 name=table_name,
686 detail=f"{n_dead:,} dead tuples ({dead_pct}% of live), last vacuum: {vacuum_info}",
687 source=source,
688 package=package,
689 suggestion="Investigate autovacuum — it may be falling behind on this table",
690 )
691 )
692
693 return CheckResult(
694 name="vacuum_health",
695 label="Vacuum health",
696 status="warning" if items else "ok",
697 summary=f"{len(items)} tables need attention" if items else "all ok",
698 items=items,
699 message="",
700 )
701
702
703ALL_CHECKS = [
704 check_invalid_indexes,
705 check_duplicate_indexes,
706 check_unused_indexes,
707 check_missing_fk_indexes,
708 check_sequence_exhaustion,
709 check_xid_wraparound,
710 check_cache_hit_ratio,
711 check_index_hit_ratio,
712 check_vacuum_health,
713]
714
715
716def run_all_checks(
717 cursor: Any, table_owners: dict[str, TableOwner]
718) -> tuple[list[CheckResult], dict[str, Any]]:
719 results: list[CheckResult] = []
720 for check_fn in ALL_CHECKS:
721 try:
722 result = check_fn(cursor, table_owners)
723 except Exception as e:
724 result = CheckResult(
725 name=check_fn.__name__.removeprefix("check_"),
726 label=check_fn.__name__.removeprefix("check_")
727 .replace("_", " ")
728 .title(),
729 status="error",
730 summary="error",
731 items=[],
732 message=str(e),
733 )
734 results.append(result)
735
736 context = gather_context(cursor, table_owners)
737 return results, context