v0.142.0
  1"""Cumulative checks — depend on counters accumulated since the last
  2pg_stat_reset. Unreliable on freshly-reset clusters.
  3
  4`check_stats_freshness`, `check_vacuum_health`, and `check_index_bloat`
  5are rendered as operational context (tier="operational") because the
  6remedy is DB-side (ANALYZE/VACUUM/REINDEX) and can't be expressed in
  7the user's model code today. `check_unused_indexes` and
  8`check_missing_index_candidates` stay in the warning tier because the
  9user can act on them by editing model indexes/constraints."""
 10
 11from __future__ import annotations
 12
 13from typing import Any
 14
 15import psycopg.errors
 16
 17from .helpers import (
 18    _display_path,
 19    _format_bytes,
 20    _index_suggestion,
 21    _pgss_usable,
 22    _top_queries_for_table,
 23)
 24from .ownership import _table_info
 25from .types import CheckItem, CheckResult, TableOwner
 26
 27
 28def check_unused_indexes(
 29    cursor: Any, table_owners: dict[str, TableOwner]
 30) -> CheckResult:
 31    """Indexes with zero scans since stats reset, excluding unique/expression/constraint-backing.
 32
 33    Also excludes indexes that are the sole coverage for a FK column — even at
 34    0 scans, FK columns should always have index coverage for referential
 35    integrity enforcement (parent DELETE/UPDATE scans the child table).
 36    """
 37    cursor.execute("""
 38        SELECT
 39            s.relname AS table_name,
 40            s.indexrelname AS index_name,
 41            pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size,
 42            pg_relation_size(s.indexrelid) AS index_size_bytes
 43        FROM pg_catalog.pg_stat_user_indexes s
 44        JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
 45        WHERE s.idx_scan = 0
 46          AND pg_relation_size(s.indexrelid) > 1048576
 47          AND 0 <> ALL (i.indkey)
 48          AND NOT i.indisunique
 49          AND NOT EXISTS (
 50              SELECT 1 FROM pg_catalog.pg_constraint c
 51              WHERE c.conindid = s.indexrelid
 52          )
 53          AND i.indisvalid
 54          AND NOT (
 55              -- Leading column is a FK column on this table
 56              EXISTS (
 57                  SELECT 1 FROM pg_catalog.pg_constraint fk
 58                  WHERE fk.conrelid = i.indrelid
 59                    AND fk.contype = 'f'
 60                    AND array_length(fk.conkey, 1) = 1
 61                    AND fk.conkey[1] = i.indkey[0]
 62              )
 63              -- And no other valid index covers that column as its leading column
 64              AND NOT EXISTS (
 65                  SELECT 1 FROM pg_catalog.pg_index other
 66                  WHERE other.indrelid = i.indrelid
 67                    AND other.indexrelid != i.indexrelid
 68                    AND other.indisvalid
 69                    AND other.indkey[0] = i.indkey[0]
 70              )
 71          )
 72        ORDER BY pg_relation_size(s.indexrelid) DESC
 73    """)
 74    rows = cursor.fetchall()
 75
 76    total_bytes = 0
 77    items: list[CheckItem] = []
 78    for table_name, index_name, index_size, index_size_bytes in rows:
 79        total_bytes += index_size_bytes
 80        source, package, model_class, model_file = _table_info(table_name, table_owners)
 81        items.append(
 82            CheckItem(
 83                table=table_name,
 84                name=index_name,
 85                detail=f"{index_size}, 0 scans",
 86                source=source,
 87                package=package,
 88                model_class=model_class,
 89                model_file=model_file,
 90                suggestion=_index_suggestion(
 91                    source=source,
 92                    package=package,
 93                    model_class=model_class,
 94                    model_file=model_file,
 95                    app_suggestion=f'Remove "{index_name}" from model indexes/constraints, then run plain postgres sync',
 96                    unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
 97                ),
 98                caveats=[],
 99            )
100        )
101
102    if items:
103        summary = f"{len(items)} ({_format_bytes(total_bytes)})"
104    else:
105        summary = "none"
106
107    return CheckResult(
108        name="unused_indexes",
109        label="Unused indexes",
110        status="warning" if items else "ok",
111        summary=summary,
112        items=items,
113        message="",
114        tier="warning",
115    )
116
117
118def check_vacuum_health(
119    cursor: Any, table_owners: dict[str, TableOwner]
120) -> CheckResult:
121    """Tables with significant dead tuple accumulation."""
122    # NULLIF guards the division: rows with n_live_tup=0 produce NULL for
123    # the ratio, which fails the >10 comparison and is excluded. Belt and
124    # suspenders — WHERE predicate order isn't guaranteed to short-circuit.
125    cursor.execute("""
126        SELECT
127            relname AS table_name,
128            n_dead_tup,
129            ROUND(100.0 * n_dead_tup / NULLIF(n_live_tup, 0), 2) AS dead_tuple_pct,
130            last_autovacuum
131        FROM pg_catalog.pg_stat_user_tables
132        WHERE n_live_tup > 0
133          AND n_dead_tup > 1000
134          AND 100.0 * n_dead_tup / NULLIF(n_live_tup, 0) > 10
135        ORDER BY n_dead_tup DESC
136        LIMIT 100
137    """)
138    rows = cursor.fetchall()
139
140    items: list[CheckItem] = []
141    for table_name, n_dead, dead_pct, last_vacuum in rows:
142        vacuum_info = str(last_vacuum)[:19] if last_vacuum else "never"
143        source, package, model_class, model_file = _table_info(table_name, table_owners)
144        items.append(
145            CheckItem(
146                table=table_name,
147                name=table_name,
148                detail=f"{n_dead:,} dead tuples ({dead_pct}% of live), last vacuum: {vacuum_info}",
149                source=source,
150                package=package,
151                model_class=model_class,
152                model_file=model_file,
153                suggestion=(
154                    f'Run VACUUM (ANALYZE) "{table_name}"; to reclaim dead '
155                    "tuples immediately. If this recurs, tune per-table "
156                    "autovacuum_vacuum_scale_factor or investigate why "
157                    "autovacuum is being starved (long-running "
158                    "transactions, write volume)."
159                ),
160                caveats=[],
161            )
162        )
163
164    return CheckResult(
165        name="vacuum_health",
166        label="Vacuum health",
167        status="warning" if items else "ok",
168        summary=f"{len(items)} tables need attention" if items else "all ok",
169        items=items,
170        message="",
171        tier="operational",
172    )
173
174
175def check_index_bloat(cursor: Any, table_owners: dict[str, TableOwner]) -> CheckResult:
176    """Indexes with significant estimated wasted space.
177
178    Uses the ioguix btree-bloat estimator (same heuristic as pghero) to
179    estimate wasted bytes per index. The estimator is approximate — it
180    compares actual pages to the number required by the average tuple size
181    from pg_statistic. Accuracy drops on tables with atypical distributions,
182    but it's been battle-tested and flags real problems in practice.
183
184    Only considers btree indexes; other AMs (gin, gist, hash, brin) have
185    different bloat characteristics and aren't covered here. Partial
186    indexes are included but estimated using full-table column widths
187    from pg_stats (not filtered by the predicate), which tends to
188    underreport actual bloat on highly selective partial indexes.
189
190    Thresholds: >30% bloat ratio AND >100 MB wasted bytes. Indexes get a
191    higher percentage bar than tables (25%) because REINDEX CONCURRENTLY
192    is cheap and routine, so it's only worth surfacing when the index is
193    genuinely degraded.
194    """
195    min_bloat_bytes = 100 * 1024 * 1024  # 100 MB
196    min_bloat_pct = 30
197
198    # Cheap pre-check: if no public-schema btree index is even large enough
199    # to plausibly contain the minimum bloat, skip the expensive estimator.
200    # Scope must match the estimator's WHERE nspname = 'public' or large
201    # catalog/extension indexes would defeat the fast-exit.
202    cursor.execute(
203        """
204        SELECT 1
205        FROM pg_class c
206        JOIN pg_am am ON c.relam = am.oid
207        JOIN pg_namespace n ON n.oid = c.relnamespace
208        WHERE c.relkind = 'i'
209          AND am.amname = 'btree'
210          AND n.nspname = 'public'
211          AND c.relpages * current_setting('block_size')::bigint >= %(min)s
212        LIMIT 1
213        """,
214        {"min": min_bloat_bytes},
215    )
216    if not cursor.fetchone():
217        return CheckResult(
218            name="index_bloat",
219            label="Index bloat",
220            status="ok",
221            summary="none",
222            items=[],
223            message="",
224            tier="operational",
225        )
226
227    # The bloat estimator joins pg_stats and can fail on tables that have
228    # never been analyzed. Probe inside psycopg's `transaction()` so that
229    # failure rolls back cleanly without affecting later checks.
230    bloat_sql = """
231        WITH btree_index_atts AS (
232            SELECT
233                nspname, relname, reltuples, relpages, indrelid, relam,
234                regexp_split_to_table(indkey::text, ' ')::smallint AS attnum,
235                indexrelid AS index_oid
236            FROM pg_index
237            JOIN pg_class ON pg_class.oid = pg_index.indexrelid
238            JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
239            JOIN pg_am ON pg_class.relam = pg_am.oid
240            WHERE pg_am.amname = 'btree'
241              AND nspname = 'public'
242        ),
243        index_item_sizes AS (
244            SELECT
245                i.nspname,
246                i.relname,
247                i.reltuples,
248                i.relpages,
249                i.relam,
250                (quote_ident(s.schemaname) || '.' || quote_ident(s.tablename))::regclass AS starelid,
251                a.attrelid AS table_oid,
252                index_oid,
253                current_setting('block_size')::numeric AS bs,
254                CASE
255                    WHEN version() ~ 'mingw32' OR version() ~ '64-bit' THEN 8
256                    ELSE 4
257                END AS maxalign,
258                24 AS pagehdr,
259                CASE WHEN max(coalesce(s.null_frac, 0)) = 0
260                    THEN 2 ELSE 6
261                END AS index_tuple_hdr,
262                sum((1 - coalesce(s.null_frac, 0)) * coalesce(s.avg_width, 2048)) AS nulldatawidth
263            FROM pg_attribute AS a
264            JOIN pg_stats AS s
265                ON (quote_ident(s.schemaname) || '.' || quote_ident(s.tablename))::regclass = a.attrelid
266                AND s.attname = a.attname
267            JOIN btree_index_atts AS i
268                ON i.indrelid = a.attrelid AND a.attnum = i.attnum
269            WHERE a.attnum > 0
270            GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9
271        ),
272        index_aligned AS (
273            SELECT
274                maxalign, bs, nspname,
275                relname AS index_name,
276                reltuples, relpages, relam, table_oid, index_oid,
277                (2 +
278                    maxalign - CASE
279                        WHEN index_tuple_hdr %% maxalign = 0 THEN maxalign
280                        ELSE index_tuple_hdr %% maxalign
281                    END
282                    + nulldatawidth + maxalign - CASE
283                        WHEN nulldatawidth::integer %% maxalign = 0 THEN maxalign
284                        ELSE nulldatawidth::integer %% maxalign
285                    END
286                )::numeric AS nulldatahdrwidth,
287                pagehdr
288            FROM index_item_sizes
289        ),
290        otta_calc AS (
291            SELECT
292                bs, nspname, table_oid, index_oid, index_name, relpages,
293                coalesce(
294                    ceil((reltuples * (4 + nulldatahdrwidth)) / (bs - pagehdr::float)) +
295                    CASE WHEN am.amname IN ('hash', 'btree') THEN 1 ELSE 0 END,
296                    0
297                ) AS otta
298            FROM index_aligned AS s2
299            LEFT JOIN pg_am am ON s2.relam = am.oid
300        ),
301        raw_bloat AS (
302            SELECT
303                nspname,
304                c.relname AS table_name,
305                index_name,
306                bs * sub.relpages::bigint AS totalbytes,
307                CASE WHEN sub.relpages <= otta THEN 0
308                    ELSE bs * (sub.relpages - otta)::bigint END AS wastedbytes,
309                CASE WHEN sub.relpages <= otta THEN 0
310                    ELSE (bs * (sub.relpages - otta)::bigint * 100)
311                         / NULLIF(bs * sub.relpages::bigint, 0) END AS realbloat,
312                stat.indexrelid
313            FROM otta_calc AS sub
314            JOIN pg_class AS c ON c.oid = sub.table_oid
315            JOIN pg_stat_user_indexes AS stat ON sub.index_oid = stat.indexrelid
316        )
317        SELECT
318            table_name,
319            index_name,
320            wastedbytes,
321            totalbytes,
322            realbloat,
323            i.indisprimary
324        FROM raw_bloat rb
325        JOIN pg_index i ON i.indexrelid = rb.indexrelid
326        WHERE wastedbytes >= %(min_bytes)s
327          AND realbloat >= %(min_pct)s
328        ORDER BY wastedbytes DESC
329        LIMIT 100
330    """
331    try:
332        with cursor.connection.transaction():
333            cursor.execute(
334                bloat_sql, {"min_bytes": min_bloat_bytes, "min_pct": min_bloat_pct}
335            )
336            rows = cursor.fetchall()
337    except psycopg.errors.DatabaseError:
338        return CheckResult(
339            name="index_bloat",
340            label="Index bloat",
341            status="skipped",
342            summary="could not estimate",
343            items=[],
344            message="Bloat estimator query failed (may require ANALYZE on target tables).",
345            tier="operational",
346        )
347    items: list[CheckItem] = []
348    for table_name, index_name, wasted, total, pct, is_primary in rows:
349        source, package, model_class, model_file = _table_info(table_name, table_owners)
350        pct_str = f"{float(pct):.0f}%"
351        primary_note = " (PRIMARY KEY)" if is_primary else ""
352        if is_primary:
353            fix = (
354                f'Requires care — REINDEX INDEX CONCURRENTLY "{index_name}"; '
355                "on a PK rebuilds it without locking writes. Monitor locks."
356            )
357        else:
358            fix = f'REINDEX INDEX CONCURRENTLY "{index_name}";'
359        items.append(
360            CheckItem(
361                table=table_name,
362                name=index_name,
363                detail=(
364                    f"{_format_bytes(wasted)} wasted ({pct_str} of "
365                    f"{_format_bytes(total)}){primary_note}"
366                ),
367                source=source,
368                package=package,
369                model_class=model_class,
370                model_file=model_file,
371                suggestion=fix,
372                caveats=[],
373            )
374        )
375
376    return CheckResult(
377        name="index_bloat",
378        label="Index bloat",
379        status="warning" if items else "ok",
380        summary=f"{len(items)} indexes bloated" if items else "none",
381        items=items,
382        message="",
383        tier="operational",
384    )
385
386
387def check_table_bloat(cursor: Any, table_owners: dict[str, TableOwner]) -> CheckResult:
388    """Tables with significant estimated wasted space (page-level bloat).
389
390    Complements `check_vacuum_health`. Dead-tuple counts only show what
391    autovacuum hasn't reclaimed yet — a table that autovacuum has been
392    running on regularly can still carry gigabytes of bloat because plain
393    VACUUM marks pages reusable but does not return space to the OS.
394    `n_dead_tup` is therefore blind to the "VACUUM ran but the table never
395    shrank" case, which is the common shape of bloat on high-churn tables
396    (caches, queues, soft-deleted history). A size-based estimator catches
397    it.
398
399    Uses the ioguix table-bloat estimator (same heuristic as pghero) to
400    estimate wasted bytes per table by comparing actual relation pages to
401    the number expected from `pg_stats` average column widths times row
402    count. Approximate — accuracy drops on tables with atypical
403    distributions or wide TOAST'd columns — but battle-tested.
404
405    Thresholds: >25% bloat ratio AND >100 MB wasted bytes. Both filters
406    required: percentage is the "is this table actually unhealthy" signal,
407    byte floor is the "is it worth fixing" filter. Tables get a lower
408    percentage bar than indexes (30%) because rewriting a table is more
409    disruptive (pg_repack/pg_squeeze or VACUUM FULL) than REINDEX
410    CONCURRENTLY.
411    """
412    min_bloat_bytes = 100 * 1024 * 1024  # 100 MB
413    min_bloat_pct = 25
414
415    # Cheap pre-check: if no public-schema table's main relation is large
416    # enough to plausibly carry the minimum bloat, skip the expensive
417    # estimator. Note: this counts only the heap (relpages), not TOAST —
418    # tables whose bloat is concentrated in a TOAST'd column (large
419    # text/jsonb) could be skipped here even though the full estimator
420    # would qualify them. Acceptable for a fast-exit; the alternative is
421    # joining pg_class twice for every DB.
422    cursor.execute(
423        """
424        SELECT 1
425        FROM pg_class c
426        JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = 'public'
427        WHERE c.relkind = 'r'
428          AND c.relpages * current_setting('block_size')::bigint >= %(min_bytes)s
429        LIMIT 1
430        """,
431        {"min_bytes": min_bloat_bytes},
432    )
433    if not cursor.fetchone():
434        return CheckResult(
435            name="table_bloat",
436            label="Table bloat",
437            status="ok",
438            summary="none",
439            items=[],
440            message="",
441            tier="operational",
442        )
443
444    # Standard ioguix table-bloat estimator — joins pg_stats, so it can
445    # fail on never-analyzed tables. Probe inside a transaction so that
446    # failure rolls back cleanly without affecting later checks.
447    bloat_sql = """
448        WITH constants AS (
449            SELECT current_setting('block_size')::numeric AS bs,
450                   23 AS hdr,
451                   8 AS ma
452        ),
453        null_headers AS (
454            SELECT
455                hdr + 1 + (sum(CASE WHEN s.null_frac <> 0 THEN 1 ELSE 0 END) / 8) AS nullhdr,
456                SUM((1 - s.null_frac) * s.avg_width) AS datawidth,
457                MAX(s.null_frac) AS maxfracsum,
458                s.schemaname,
459                s.tablename,
460                hdr, ma, bs
461            FROM pg_stats s
462            CROSS JOIN constants
463            WHERE s.schemaname = 'public'
464            GROUP BY s.schemaname, s.tablename, hdr, ma, bs
465        ),
466        data_headers AS (
467            SELECT
468                ma, bs, hdr, schemaname, tablename,
469                (datawidth + (hdr + ma - (CASE WHEN hdr %% ma = 0 THEN ma ELSE hdr %% ma END)))::numeric AS datahdr,
470                (maxfracsum * (nullhdr + ma - (CASE WHEN nullhdr %% ma = 0 THEN ma ELSE nullhdr %% ma END))) AS nullhdr2
471            FROM null_headers
472        ),
473        table_estimates AS (
474            SELECT
475                nh.schemaname, nh.tablename, nh.bs,
476                c.reltuples::numeric AS est_rows,
477                c.relpages::bigint * nh.bs AS table_bytes,
478                CEIL((c.reltuples *
479                        (nh.datahdr + nh.nullhdr2 + 4 + nh.ma -
480                            (CASE WHEN nh.datahdr %% nh.ma = 0 THEN nh.ma ELSE nh.datahdr %% nh.ma END)
481                        )
482                    ) / (nh.bs - 20)) * nh.bs AS expected_bytes,
483                c.reltoastrelid
484            FROM data_headers nh
485            JOIN pg_namespace n ON n.nspname = nh.schemaname
486            JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = nh.tablename
487            WHERE c.relkind = 'r'
488        ),
489        with_toast AS (
490            SELECT
491                te.schemaname,
492                te.tablename,
493                te.est_rows,
494                te.table_bytes + (COALESCE(t.relpages, 0) * te.bs)::bigint AS table_bytes,
495                te.expected_bytes + (CEIL(COALESCE(t.reltuples, 0) / 4) * te.bs)::bigint AS expected_bytes
496            FROM table_estimates te
497            LEFT JOIN pg_class t ON te.reltoastrelid = t.oid AND t.relkind = 't'
498        )
499        SELECT
500            tablename,
501            table_bytes,
502            (table_bytes - expected_bytes)::bigint AS wasted_bytes,
503            (100.0 * (table_bytes - expected_bytes) / NULLIF(table_bytes, 0)) AS bloat_pct,
504            est_rows
505        FROM with_toast
506        -- min_bytes filter establishes wasted_bytes > 0 by transitivity (it's a
507        -- positive byte count), so the percentage filter never sees a negative
508        -- numerator from over-estimated expected_bytes.
509        WHERE (table_bytes - expected_bytes) >= %(min_bytes)s
510          AND (100.0 * (table_bytes - expected_bytes) / NULLIF(table_bytes, 0)) >= %(min_pct)s
511        ORDER BY wasted_bytes DESC
512        LIMIT 100
513    """
514    try:
515        with cursor.connection.transaction():
516            cursor.execute(
517                bloat_sql, {"min_bytes": min_bloat_bytes, "min_pct": min_bloat_pct}
518            )
519            rows = cursor.fetchall()
520    except psycopg.errors.DatabaseError:
521        return CheckResult(
522            name="table_bloat",
523            label="Table bloat",
524            status="skipped",
525            summary="could not estimate",
526            items=[],
527            message="Bloat estimator query failed (may require ANALYZE on target tables).",
528            tier="operational",
529        )
530
531    items: list[CheckItem] = []
532    for table_name, total, wasted, pct, est_rows in rows:
533        source, package, model_class, model_file = _table_info(table_name, table_owners)
534        pct_str = f"{float(pct):.0f}%"
535        # Lead with online-rewrite tools — VACUUM FULL works but takes an
536        # ACCESS EXCLUSIVE lock for the duration, which is rarely
537        # acceptable on a hot table. pg_repack and pg_squeeze rewrite
538        # without blocking readers/writers; flag both since availability
539        # depends on the host (Heroku/RDS ship pg_repack; PlanetScale ships
540        # pg_squeeze).
541        suggestion = (
542            f'Reclaim with `pg_repack -t "{table_name}"` (extension) or '
543            f"`SELECT squeeze.squeeze_table('public', '{table_name}')` "
544            f"(pg_squeeze extension) for online rewrites. "
545            f'`VACUUM FULL "{table_name}"` works but takes ACCESS EXCLUSIVE '
546            f"on the table for the duration."
547        )
548        items.append(
549            CheckItem(
550                table=table_name,
551                name=table_name,
552                # reltuples is -1 on never-analyzed tables; clamp so we
553                # don't render "~-1 rows".
554                detail=(
555                    f"{_format_bytes(wasted)} wasted ({pct_str} of "
556                    f"{_format_bytes(total)}, ~{max(0, int(est_rows or 0)):,} rows)"
557                ),
558                source=source,
559                package=package,
560                model_class=model_class,
561                model_file=model_file,
562                suggestion=suggestion,
563                caveats=[],
564            )
565        )
566
567    return CheckResult(
568        name="table_bloat",
569        label="Table bloat",
570        status="warning" if items else "ok",
571        summary=f"{len(items)} tables bloated" if items else "none",
572        items=items,
573        message="",
574        tier="operational",
575    )
576
577
578def check_stats_freshness(
579    cursor: Any, table_owners: dict[str, TableOwner]
580) -> CheckResult:
581    """Tables whose planner statistics are missing or stale relative to activity.
582
583    Stale stats cause the planner to make bad decisions and cause every other
584    stat-based check (unused_indexes, vacuum_health, missing-index heuristics,
585    slow-query analysis) to be less trustworthy. Autovacuum's default
586    analyze trigger is 10% row modification; we flag at 25% as a signal it's
587    falling behind, and always flag tables that have literally never been
588    analyzed.
589    """
590    # pg_class.reltuples is the robust "never analyzed" signal: it's -1 until
591    # ANALYZE runs and populates it, and pg_stat_reset() does NOT clear it
592    # (pg_stat_reset only wipes the cumulative counters in pg_stat_*). Relying
593    # on last_analyze/last_autoanalyze alone would falsely flag every table
594    # after a stats reset even though pg_statistic is still populated.
595    cursor.execute("""
596        SELECT
597            st.relname AS table_name,
598            st.n_live_tup,
599            st.n_mod_since_analyze,
600            GREATEST(st.last_analyze, st.last_autoanalyze) AS last_any_analyze,
601            pg_relation_size(st.relid) AS bytes,
602            c.reltuples AS pg_class_reltuples
603        FROM pg_catalog.pg_stat_user_tables st
604        JOIN pg_catalog.pg_class c ON c.oid = st.relid
605        WHERE st.schemaname = 'public'
606    """)
607    rows = cursor.fetchall()
608
609    min_table_bytes = 1_000_000  # 1 MB — outer size floor for "worth flagging"
610    min_mod_absolute = 1_000
611    stale_churn_pct = 25.0
612
613    items: list[CheckItem] = []
614    for (
615        table_name,
616        n_live,
617        n_mod,
618        last_any_analyze,
619        bytes_,
620        pg_class_reltuples,
621    ) in rows:
622        n_live = n_live or 0
623        n_mod = n_mod or 0
624        bytes_ = bytes_ or 0
625
626        # Skip trivially small tables with little activity — false-positive magnet.
627        if bytes_ < min_table_bytes and n_mod < min_mod_absolute:
628            continue
629
630        reason = None
631        detail = None
632
633        truly_never_analyzed = pg_class_reltuples is not None and pg_class_reltuples < 0
634
635        if truly_never_analyzed:
636            # Inner threshold (10 MB) is intentionally stricter than the outer
637            # (1 MB): never-analyzed tables produce suppressible-only noise on
638            # small tables, but on larger tables or high-modification workloads
639            # they directly skew planner choices.
640            if n_mod >= min_mod_absolute or bytes_ >= 10_000_000:
641                reason = "never analyzed"
642                detail = (
643                    f"never analyzed, table size {_format_bytes(bytes_)} "
644                    f"— planner statistics are absent and n_live_tup is unreliable"
645                )
646        elif last_any_analyze is None:
647            # pg_class.reltuples >= 0 but timestamps are gone → pg_stat_reset()
648            # recently fired; pg_statistic is still valid, we just can't assess
649            # freshness until counters repopulate. Skip rather than alarm.
650            continue
651        else:
652            if n_live > 0 and n_mod >= min_mod_absolute:
653                churn_pct = (n_mod / n_live) * 100
654                if churn_pct >= stale_churn_pct:
655                    reason = "stale"
656                    age = str(last_any_analyze)[:19]
657                    detail = (
658                        f"{n_mod:,} tuples modified ({churn_pct:.1f}% churn) "
659                        f"since last analyze at {age}"
660                    )
661
662        if reason is None:
663            continue
664
665        source, package, model_class, model_file = _table_info(table_name, table_owners)
666        items.append(
667            CheckItem(
668                table=table_name,
669                name=table_name,
670                detail=detail or "",
671                source=source,
672                package=package,
673                model_class=model_class,
674                model_file=model_file,
675                suggestion=(
676                    f'Run ANALYZE "{table_name}"; to refresh planner statistics. '
677                    "If this recurs, consider tuning per-table "
678                    "autovacuum_analyze_scale_factor."
679                ),
680                caveats=[],
681            )
682        )
683
684    return CheckResult(
685        name="stats_freshness",
686        label="Stats freshness",
687        status="warning" if items else "ok",
688        summary=f"{len(items)} tables have stale stats" if items else "all ok",
689        items=items,
690        message="",
691        tier="operational",
692    )
693
694
695def check_missing_index_candidates(
696    cursor: Any, table_owners: dict[str, TableOwner]
697) -> CheckResult:
698    """Tables whose sequential-scan activity suggests a missing index.
699
700    Flags a table if either heuristic fires:
701
702      A. pghero-style ratio — tables where a significant share of scans are
703         sequential rather than indexed (catches tables with no usable index
704         for their hot path).
705      B. Rows-per-seq-scan — tables where each sequential scan reads many
706         rows (catches tables with good coverage for most queries but a
707         cold-path query doing expensive scans; pghero's ratio misses these).
708
709    Uses pg_relation_size (not n_live_tup) for the size floor because
710    n_live_tup is unreliable when ANALYZE hasn't run.
711
712    When pg_stat_statements is installed, drills into the top contributing
713    queries for each flagged table so the finding is actionable rather than
714    "go investigate this table."
715    """
716    min_table_bytes = 10_000_000  # 10 MB
717    ratio_threshold_pct = 5
718    min_rows_per_seq_scan = 1_000
719    min_seq_scans = 10
720
721    pgss_state = _pgss_usable(cursor)
722    pgss_usable = pgss_state == "usable"
723
724    cursor.execute(
725        """
726        SELECT
727            relname AS table_name,
728            seq_scan,
729            seq_tup_read,
730            idx_scan,
731            pg_relation_size(relid) AS table_bytes
732        FROM pg_catalog.pg_stat_user_tables
733        WHERE schemaname = 'public'
734          AND pg_relation_size(relid) >= %(min_bytes)s
735        """,
736        {"min_bytes": min_table_bytes},
737    )
738    rows = cursor.fetchall()
739
740    items: list[CheckItem] = []
741    for table_name, seq_scan, seq_tup_read, idx_scan, bytes_ in rows:
742        seq_scan = seq_scan or 0
743        seq_tup_read = seq_tup_read or 0
744        idx_scan = idx_scan or 0
745        total_scans = seq_scan + idx_scan
746
747        reasons: list[str] = []
748
749        # Rule A — pghero-style ratio. Includes the idx_scan=0 case: a big
750        # table with many seq scans and zero index scans is the clearest
751        # possible sign that no index is serving the hot path.
752        # (seq_scan >= min_seq_scans implies total_scans > 0.)
753        if seq_scan >= min_seq_scans:
754            seq_pct = 100.0 * seq_scan / total_scans
755            if seq_pct > ratio_threshold_pct:
756                reasons.append(
757                    f"{seq_pct:.1f}% of scans are sequential "
758                    f"({seq_scan:,} seq / {idx_scan:,} idx)"
759                )
760
761        # Rule B — amplification per seq scan.
762        if seq_scan >= min_seq_scans:
763            rows_per_seq = seq_tup_read // seq_scan
764            if rows_per_seq >= min_rows_per_seq_scan:
765                reasons.append(
766                    f"{rows_per_seq:,} rows read per seq scan "
767                    f"({seq_scan:,} scans, {seq_tup_read:,} tuples total)"
768                )
769
770        if not reasons:
771            continue
772
773        # Drill into the top contributing queries if pg_stat_statements is there.
774        top_queries = (
775            _top_queries_for_table(cursor, table_name, limit=3) if pgss_usable else []
776        )
777        if top_queries:
778            query_lines = []
779            for i, q in enumerate(top_queries, 1):
780                query_lines.append(
781                    f"    {i}. {q['blks_per_call']:,.0f} blks/call, "
782                    f"{q['rows_per_call']} rows/call, "
783                    f"{q['calls']:,} calls, {q['total_ms']:.0f}ms total"
784                )
785                query_lines.append(f"       {q['query'][:240]}")
786            queries_detail = "\n  top queries:\n" + "\n".join(query_lines)
787        elif pgss_state == "not_installed":
788            queries_detail = "\n  (install pg_stat_statements for per-query drill-down)"
789        elif pgss_state == "no_permission":
790            queries_detail = (
791                "\n  (pg_stat_statements is installed but this role cannot "
792                "read it — grant pg_read_all_stats for per-query drill-down)"
793            )
794        else:
795            queries_detail = (
796                "\n  (pg_stat_statements is installed but has no matching "
797                "queries yet — run production traffic or wait for stats to "
798                "accumulate, then re-check)"
799            )
800
801        source, package, model_class, model_file = _table_info(table_name, table_owners)
802        index_snippet = 'postgres.Index(name="...", fields=["..."])'
803        # Suggestions reference "the top queries above" when we actually
804        # printed some; otherwise point the user at the general workflow
805        # (run pg_stat_statements / EXPLAIN against suspected hot queries).
806        explain_lead = (
807            "Run EXPLAIN on the top queries above"
808            if top_queries
809            else "Identify the queries hitting this table (pg_stat_statements, app logs, or slow-query log) and EXPLAIN them"
810        )
811        if model_class and model_file:
812            suggestion = (
813                f"{explain_lead}. If they share "
814                f"WHERE/JOIN columns that aren't indexed, edit "
815                f"{_display_path(model_file)} and add a {index_snippet} to "
816                f"{model_class}.model_options, then `plain postgres sync`."
817            )
818        elif model_class:
819            suggestion = (
820                f"{explain_lead}. If they share "
821                f"WHERE/JOIN columns that aren't indexed, add a "
822                f"{index_snippet} to {model_class}.model_options "
823                f"and run `plain postgres sync`."
824            )
825        elif source == "package":
826            suggestion = f"Managed by {package} — report upstream if this persists."
827        else:
828            suggestion = (
829                f"{explain_lead}. If they share "
830                f"WHERE/JOIN columns that aren't indexed, add an index to "
831                f'"{table_name}".'
832            )
833
834        items.append(
835            CheckItem(
836                table=table_name,
837                name=table_name,
838                detail=(
839                    f"{_format_bytes(bytes_)}; " + "; ".join(reasons) + queries_detail
840                ),
841                source=source,
842                package=package,
843                model_class=model_class,
844                model_file=model_file,
845                suggestion=suggestion,
846                caveats=[],
847            )
848        )
849
850    items.sort(key=lambda it: it["table"])
851
852    return CheckResult(
853        name="missing_index_candidates",
854        label="Missing index candidates",
855        status="warning" if items else "ok",
856        summary=f"{len(items)} tables flagged" if items else "none",
857        items=items,
858        message="",
859        tier="warning",
860    )
861
862
863# Tier assignment lives in runner.py alongside ALL_CHECKS — see
864# `_OPERATIONAL_CHECKS` there. Keeping it near the check list (rather than
865# as function attributes) avoids the type-ignore dance and keeps both the
866# ordering and the tier mapping in one place.