v0.142.0
  1"""Structural checks — always-real findings against the current schema.
  2
  3These fire the moment the condition exists; they don't depend on
  4accumulated stats since the last reset. Each has an immediately
  5actionable remediation in the user's code (or SQL for unmanaged tables)."""
  6
  7from __future__ import annotations
  8
  9from typing import Any
 10
 11from .helpers import _index_suggestion
 12from .ownership import _table_info
 13from .types import CheckItem, CheckResult, TableOwner
 14
 15
 16def check_invalid_indexes(
 17    cursor: Any, table_owners: dict[str, TableOwner]
 18) -> CheckResult:
 19    """Indexes from failed CREATE INDEX CONCURRENTLY — maintained on writes, never used for reads."""
 20    cursor.execute("""
 21        SELECT
 22            s.relname AS table_name,
 23            s.indexrelname AS index_name,
 24            pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size
 25        FROM pg_catalog.pg_stat_user_indexes s
 26        JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
 27        WHERE NOT i.indisvalid
 28    """)
 29    rows = cursor.fetchall()
 30
 31    items: list[CheckItem] = []
 32    for table_name, index_name, index_size in rows:
 33        source, package, model_class, model_file = _table_info(table_name, table_owners)
 34        items.append(
 35            CheckItem(
 36                table=table_name,
 37                name=index_name,
 38                detail=index_size,
 39                source=source,
 40                package=package,
 41                model_class=model_class,
 42                model_file=model_file,
 43                suggestion=_index_suggestion(
 44                    source=source,
 45                    package=package,
 46                    model_class=model_class,
 47                    model_file=model_file,
 48                    app_suggestion=f'Drop and re-run the migration that created it: DROP INDEX CONCURRENTLY "{index_name}";',
 49                    unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
 50                ),
 51                caveats=[],
 52            )
 53        )
 54
 55    return CheckResult(
 56        name="invalid_indexes",
 57        label="Invalid indexes",
 58        status="warning" if items else "ok",
 59        summary=str(len(items)) if items else "none",
 60        items=items,
 61        message="",
 62        tier="warning",
 63    )
 64
 65
 66def check_duplicate_indexes(
 67    cursor: Any, table_owners: dict[str, TableOwner]
 68) -> CheckResult:
 69    """Indexes redundant with another on the same table.
 70
 71    Two flavors are flagged:
 72
 73    - **Prefix duplicate** — a non-unique index whose columns are a strict
 74      prefix of another index's. The longer index covers the same queries.
 75    - **Exact duplicate** — same columns and access method as another index.
 76      If the pair contains a unique-backed btree, the non-unique one is the
 77      redundant one (the unique already covers the queries and enforces
 78      uniqueness). If both are non-unique, the alphabetically later name
 79      gets flagged (deterministic).
 80
 81    Each index column's canonical definition comes from
 82    ``pg_get_indexdef(indexrelid, k, false)`` — that text includes the
 83    column name or expression, plus any non-default operator class,
 84    collation, or sort order. Comparing per-column definitions means we
 85    catch expression duplicates (e.g. two ``LOWER(email)`` columns) and
 86    won't false-positive across different opclasses or collations.
 87    Access method (``pg_am.amname``) is checked separately because the
 88    per-column text doesn't include it — a hash and btree on the same
 89    column have identical column text but support different operators.
 90    """
 91    cursor.execute("""
 92        SELECT
 93            ct.relname AS table_name,
 94            ci.relname AS index_name,
 95            am.amname AS access_method,
 96            array(
 97                SELECT pg_get_indexdef(i.indexrelid, k, false)
 98                FROM generate_series(1, i.indnatts) AS k
 99            ) AS column_defs,
100            i.indisunique,
101            pg_size_pretty(pg_relation_size(ci.oid)) AS index_size
102        FROM pg_catalog.pg_index i
103        JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid
104        JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid
105        JOIN pg_catalog.pg_am am ON am.oid = ci.relam
106        JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
107        WHERE n.nspname = 'public'
108          AND i.indisvalid
109          AND i.indpred IS NULL
110        ORDER BY ct.relname, ci.relname
111    """)
112    rows = cursor.fetchall()
113
114    by_table: dict[str, list[tuple[str, str, list[str], bool, str]]] = {}
115    for table_name, index_name, am_name, defs, is_unique, size in rows:
116        by_table.setdefault(table_name, []).append(
117            (index_name, am_name, defs, is_unique, size)
118        )
119
120    items: list[CheckItem] = []
121    flagged: set[str] = set()
122    for table_name, indexes in by_table.items():
123        for i, idx_a in enumerate(indexes):
124            for idx_b in indexes[i + 1 :]:
125                # Try both orderings — prefix-redundancy is asymmetric, and
126                # exact-duplicate flagging picks one side deterministically.
127                for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
128                    name_s, am_s, defs_s, unique_s, size_s = shorter
129                    name_l, am_l, defs_l, unique_l, _ = longer
130                    # Different access methods serve different operators
131                    # (e.g. hash supports `=` only, btree supports ordering).
132                    if name_s in flagged or am_s != am_l:
133                        continue
134
135                    is_prefix_dup = (
136                        not unique_s
137                        and len(defs_s) < len(defs_l)
138                        and defs_l[: len(defs_s)] == defs_s
139                    )
140                    is_exact_dup = (
141                        defs_s == defs_l
142                        and not unique_s
143                        and (unique_l or name_s > name_l)
144                    )
145
146                    if not (is_prefix_dup or is_exact_dup):
147                        continue
148
149                    source, package, model_class, model_file = _table_info(
150                        table_name, table_owners
151                    )
152                    app_suggestion = f'Remove "{name_s}" from model indexes/constraints, then run plain postgres sync'
153
154                    items.append(
155                        CheckItem(
156                            table=table_name,
157                            name=name_s,
158                            detail=f"{size_s}, redundant with {name_l}",
159                            source=source,
160                            package=package,
161                            model_class=model_class,
162                            model_file=model_file,
163                            suggestion=_index_suggestion(
164                                source=source,
165                                package=package,
166                                model_class=model_class,
167                                model_file=model_file,
168                                app_suggestion=app_suggestion,
169                                unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{name_s}";',
170                            ),
171                            caveats=[],
172                        )
173                    )
174                    flagged.add(name_s)
175
176    return CheckResult(
177        name="duplicate_indexes",
178        label="Duplicate indexes",
179        status="warning" if items else "ok",
180        summary=str(len(items)) if items else "none",
181        items=items,
182        message="",
183        tier="warning",
184    )
185
186
187def check_missing_fk_indexes(
188    cursor: Any, table_owners: dict[str, TableOwner]
189) -> CheckResult:
190    """Foreign key columns without a leading index — JOINs on these do sequential scans.
191
192    Partial indexes (``WHERE`` clause set on ``pg_index.indpred``) don't
193    count: Postgres only uses them for queries that imply the partial
194    predicate, so FK lookups and cascade deletes outside that predicate
195    still sequential-scan. The narrow ``WHERE fk IS NOT NULL`` case —
196    which Postgres can match to ``WHERE fk = ?`` — is conservatively
197    treated as not covering; users wanting guaranteed FK coverage should
198    add a regular non-partial index. Match the preflight's coverage rule.
199    """
200    cursor.execute("""
201        SELECT
202            ct.relname AS table_name,
203            a.attname AS column_name,
204            cc.relname AS referenced_table,
205            c.conname AS constraint_name
206        FROM pg_catalog.pg_constraint c
207        JOIN pg_catalog.pg_class ct ON ct.oid = c.conrelid
208        JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
209        JOIN pg_catalog.pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = c.conkey[1]
210        JOIN pg_catalog.pg_class cc ON cc.oid = c.confrelid
211        WHERE c.contype = 'f'
212          AND array_length(c.conkey, 1) = 1
213          AND n.nspname = 'public'
214          AND NOT EXISTS (
215              SELECT 1
216              FROM pg_catalog.pg_index i
217              WHERE i.indrelid = c.conrelid
218                AND i.indkey[0] = c.conkey[1]
219                AND i.indpred IS NULL
220          )
221        ORDER BY ct.relname, a.attname
222    """)
223    rows = cursor.fetchall()
224
225    items: list[CheckItem] = []
226    for table_name, column_name, referenced_table, constraint_name in rows:
227        source, package, model_class, model_file = _table_info(table_name, table_owners)
228        items.append(
229            CheckItem(
230                table=table_name,
231                name=f"{table_name}.{column_name}",
232                detail=f"references {referenced_table}",
233                source=source,
234                package=package,
235                model_class=model_class,
236                model_file=model_file,
237                suggestion=_index_suggestion(
238                    source=source,
239                    package=package,
240                    model_class=model_class,
241                    model_file=model_file,
242                    app_suggestion=f'Add an Index on ["{column_name}"] to the model, then run plain postgres sync',
243                    unmanaged_suggestion=f'CREATE INDEX CONCURRENTLY ON "{table_name}" ("{column_name}");',
244                ),
245                caveats=[],
246            )
247        )
248
249    return CheckResult(
250        name="missing_fk_indexes",
251        label="Missing FK indexes",
252        status="warning" if items else "ok",
253        summary=str(len(items)) if items else "none",
254        items=items,
255        message="",
256        tier="warning",
257    )
258
259
260def check_sequence_exhaustion(
261    cursor: Any, table_owners: dict[str, TableOwner]
262) -> CheckResult:
263    """Identity sequences approaching their type max."""
264    cursor.execute("""
265        WITH sequences AS (
266            SELECT
267                s.seqrelid,
268                s.seqtypid,
269                s.seqmax,
270                ps.last_value,
271                ps.start_value
272            FROM pg_catalog.pg_sequence s
273            JOIN pg_catalog.pg_class c ON c.oid = s.seqrelid
274            JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
275            JOIN pg_sequences ps ON ps.schemaname = n.nspname
276                AND ps.sequencename = c.relname
277        )
278        SELECT
279            d.refobjid::regclass AS table_name,
280            a.attname AS column_name,
281            seq.seqtypid::regtype AS data_type,
282            COALESCE(seq.last_value, seq.start_value) AS current_value,
283            seq.seqmax AS max_value,
284            ROUND(
285                100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
286            ) AS pct_used
287        FROM sequences seq
288        JOIN pg_catalog.pg_depend d ON d.objid = seq.seqrelid
289            AND d.deptype IN ('a', 'i')
290            AND d.classid = 'pg_class'::regclass
291            AND d.refclassid = 'pg_class'::regclass
292        JOIN pg_catalog.pg_attribute a ON a.attrelid = d.refobjid
293            AND a.attnum = d.refobjsubid
294        WHERE ROUND(
295            100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
296        ) > 50
297        ORDER BY pct_used DESC
298    """)
299    rows = cursor.fetchall()
300
301    items: list[CheckItem] = []
302    worst_pct = 0.0
303    for table_name, column_name, data_type, current_value, max_value, pct_used in rows:
304        pct = float(pct_used)
305        worst_pct = max(worst_pct, pct)
306        table_str = str(table_name)
307        source, package, model_class, model_file = _table_info(table_str, table_owners)
308        items.append(
309            CheckItem(
310                table=table_str,
311                name=f"{table_str}.{column_name}",
312                detail=f"{data_type}, {pct_used}% used ({current_value:,} / {max_value:,})",
313                source=source,
314                package=package,
315                model_class=model_class,
316                model_file=model_file,
317                suggestion=f'ALTER TABLE "{table_str}" ALTER COLUMN "{column_name}" SET DATA TYPE bigint;',
318                caveats=[],
319            )
320        )
321
322    if worst_pct >= 90:
323        status = "critical"
324    elif items:
325        status = "warning"
326    else:
327        status = "ok"
328
329    return CheckResult(
330        name="sequence_exhaustion",
331        label="Sequence exhaustion",
332        status=status,
333        summary=f"{worst_pct}% worst" if items else "all ok",
334        items=items,
335        message="",
336        tier="warning",
337    )