1"""Structural checks — always-real findings against the current schema.
2
3These fire the moment the condition exists; they don't depend on
4accumulated stats since the last reset. Each has an immediately
5actionable remediation in the user's code (or SQL for unmanaged tables)."""
6
7from __future__ import annotations
8
9from typing import Any
10
11from .helpers import _index_suggestion
12from .ownership import _table_info
13from .types import CheckItem, CheckResult, TableOwner
14
15
16def check_invalid_indexes(
17 cursor: Any, table_owners: dict[str, TableOwner]
18) -> CheckResult:
19 """Indexes from failed CREATE INDEX CONCURRENTLY — maintained on writes, never used for reads."""
20 cursor.execute("""
21 SELECT
22 s.relname AS table_name,
23 s.indexrelname AS index_name,
24 pg_size_pretty(pg_relation_size(s.indexrelid)) AS index_size
25 FROM pg_catalog.pg_stat_user_indexes s
26 JOIN pg_catalog.pg_index i ON s.indexrelid = i.indexrelid
27 WHERE NOT i.indisvalid
28 """)
29 rows = cursor.fetchall()
30
31 items: list[CheckItem] = []
32 for table_name, index_name, index_size in rows:
33 source, package, model_class, model_file = _table_info(table_name, table_owners)
34 items.append(
35 CheckItem(
36 table=table_name,
37 name=index_name,
38 detail=index_size,
39 source=source,
40 package=package,
41 model_class=model_class,
42 model_file=model_file,
43 suggestion=_index_suggestion(
44 source=source,
45 package=package,
46 model_class=model_class,
47 model_file=model_file,
48 app_suggestion=f'Drop and re-run the migration that created it: DROP INDEX CONCURRENTLY "{index_name}";',
49 unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{index_name}";',
50 ),
51 caveats=[],
52 )
53 )
54
55 return CheckResult(
56 name="invalid_indexes",
57 label="Invalid indexes",
58 status="warning" if items else "ok",
59 summary=str(len(items)) if items else "none",
60 items=items,
61 message="",
62 tier="warning",
63 )
64
65
66def check_duplicate_indexes(
67 cursor: Any, table_owners: dict[str, TableOwner]
68) -> CheckResult:
69 """Indexes redundant with another on the same table.
70
71 Two flavors are flagged:
72
73 - **Prefix duplicate** — a non-unique index whose columns are a strict
74 prefix of another index's. The longer index covers the same queries.
75 - **Exact duplicate** — same columns and access method as another index.
76 If the pair contains a unique-backed btree, the non-unique one is the
77 redundant one (the unique already covers the queries and enforces
78 uniqueness). If both are non-unique, the alphabetically later name
79 gets flagged (deterministic).
80
81 Each index column's canonical definition comes from
82 ``pg_get_indexdef(indexrelid, k, false)`` — that text includes the
83 column name or expression, plus any non-default operator class,
84 collation, or sort order. Comparing per-column definitions means we
85 catch expression duplicates (e.g. two ``LOWER(email)`` columns) and
86 won't false-positive across different opclasses or collations.
87 Access method (``pg_am.amname``) is checked separately because the
88 per-column text doesn't include it — a hash and btree on the same
89 column have identical column text but support different operators.
90 """
91 cursor.execute("""
92 SELECT
93 ct.relname AS table_name,
94 ci.relname AS index_name,
95 am.amname AS access_method,
96 array(
97 SELECT pg_get_indexdef(i.indexrelid, k, false)
98 FROM generate_series(1, i.indnatts) AS k
99 ) AS column_defs,
100 i.indisunique,
101 pg_size_pretty(pg_relation_size(ci.oid)) AS index_size
102 FROM pg_catalog.pg_index i
103 JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid
104 JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid
105 JOIN pg_catalog.pg_am am ON am.oid = ci.relam
106 JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
107 WHERE n.nspname = 'public'
108 AND i.indisvalid
109 AND i.indpred IS NULL
110 ORDER BY ct.relname, ci.relname
111 """)
112 rows = cursor.fetchall()
113
114 by_table: dict[str, list[tuple[str, str, list[str], bool, str]]] = {}
115 for table_name, index_name, am_name, defs, is_unique, size in rows:
116 by_table.setdefault(table_name, []).append(
117 (index_name, am_name, defs, is_unique, size)
118 )
119
120 items: list[CheckItem] = []
121 flagged: set[str] = set()
122 for table_name, indexes in by_table.items():
123 for i, idx_a in enumerate(indexes):
124 for idx_b in indexes[i + 1 :]:
125 # Try both orderings — prefix-redundancy is asymmetric, and
126 # exact-duplicate flagging picks one side deterministically.
127 for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
128 name_s, am_s, defs_s, unique_s, size_s = shorter
129 name_l, am_l, defs_l, unique_l, _ = longer
130 # Different access methods serve different operators
131 # (e.g. hash supports `=` only, btree supports ordering).
132 if name_s in flagged or am_s != am_l:
133 continue
134
135 is_prefix_dup = (
136 not unique_s
137 and len(defs_s) < len(defs_l)
138 and defs_l[: len(defs_s)] == defs_s
139 )
140 is_exact_dup = (
141 defs_s == defs_l
142 and not unique_s
143 and (unique_l or name_s > name_l)
144 )
145
146 if not (is_prefix_dup or is_exact_dup):
147 continue
148
149 source, package, model_class, model_file = _table_info(
150 table_name, table_owners
151 )
152 app_suggestion = f'Remove "{name_s}" from model indexes/constraints, then run plain postgres sync'
153
154 items.append(
155 CheckItem(
156 table=table_name,
157 name=name_s,
158 detail=f"{size_s}, redundant with {name_l}",
159 source=source,
160 package=package,
161 model_class=model_class,
162 model_file=model_file,
163 suggestion=_index_suggestion(
164 source=source,
165 package=package,
166 model_class=model_class,
167 model_file=model_file,
168 app_suggestion=app_suggestion,
169 unmanaged_suggestion=f'DROP INDEX CONCURRENTLY "{name_s}";',
170 ),
171 caveats=[],
172 )
173 )
174 flagged.add(name_s)
175
176 return CheckResult(
177 name="duplicate_indexes",
178 label="Duplicate indexes",
179 status="warning" if items else "ok",
180 summary=str(len(items)) if items else "none",
181 items=items,
182 message="",
183 tier="warning",
184 )
185
186
187def check_missing_fk_indexes(
188 cursor: Any, table_owners: dict[str, TableOwner]
189) -> CheckResult:
190 """Foreign key columns without a leading index — JOINs on these do sequential scans.
191
192 Partial indexes (``WHERE`` clause set on ``pg_index.indpred``) don't
193 count: Postgres only uses them for queries that imply the partial
194 predicate, so FK lookups and cascade deletes outside that predicate
195 still sequential-scan. The narrow ``WHERE fk IS NOT NULL`` case —
196 which Postgres can match to ``WHERE fk = ?`` — is conservatively
197 treated as not covering; users wanting guaranteed FK coverage should
198 add a regular non-partial index. Match the preflight's coverage rule.
199 """
200 cursor.execute("""
201 SELECT
202 ct.relname AS table_name,
203 a.attname AS column_name,
204 cc.relname AS referenced_table,
205 c.conname AS constraint_name
206 FROM pg_catalog.pg_constraint c
207 JOIN pg_catalog.pg_class ct ON ct.oid = c.conrelid
208 JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace
209 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = c.conkey[1]
210 JOIN pg_catalog.pg_class cc ON cc.oid = c.confrelid
211 WHERE c.contype = 'f'
212 AND array_length(c.conkey, 1) = 1
213 AND n.nspname = 'public'
214 AND NOT EXISTS (
215 SELECT 1
216 FROM pg_catalog.pg_index i
217 WHERE i.indrelid = c.conrelid
218 AND i.indkey[0] = c.conkey[1]
219 AND i.indpred IS NULL
220 )
221 ORDER BY ct.relname, a.attname
222 """)
223 rows = cursor.fetchall()
224
225 items: list[CheckItem] = []
226 for table_name, column_name, referenced_table, constraint_name in rows:
227 source, package, model_class, model_file = _table_info(table_name, table_owners)
228 items.append(
229 CheckItem(
230 table=table_name,
231 name=f"{table_name}.{column_name}",
232 detail=f"references {referenced_table}",
233 source=source,
234 package=package,
235 model_class=model_class,
236 model_file=model_file,
237 suggestion=_index_suggestion(
238 source=source,
239 package=package,
240 model_class=model_class,
241 model_file=model_file,
242 app_suggestion=f'Add an Index on ["{column_name}"] to the model, then run plain postgres sync',
243 unmanaged_suggestion=f'CREATE INDEX CONCURRENTLY ON "{table_name}" ("{column_name}");',
244 ),
245 caveats=[],
246 )
247 )
248
249 return CheckResult(
250 name="missing_fk_indexes",
251 label="Missing FK indexes",
252 status="warning" if items else "ok",
253 summary=str(len(items)) if items else "none",
254 items=items,
255 message="",
256 tier="warning",
257 )
258
259
260def check_sequence_exhaustion(
261 cursor: Any, table_owners: dict[str, TableOwner]
262) -> CheckResult:
263 """Identity sequences approaching their type max."""
264 cursor.execute("""
265 WITH sequences AS (
266 SELECT
267 s.seqrelid,
268 s.seqtypid,
269 s.seqmax,
270 ps.last_value,
271 ps.start_value
272 FROM pg_catalog.pg_sequence s
273 JOIN pg_catalog.pg_class c ON c.oid = s.seqrelid
274 JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
275 JOIN pg_sequences ps ON ps.schemaname = n.nspname
276 AND ps.sequencename = c.relname
277 )
278 SELECT
279 d.refobjid::regclass AS table_name,
280 a.attname AS column_name,
281 seq.seqtypid::regtype AS data_type,
282 COALESCE(seq.last_value, seq.start_value) AS current_value,
283 seq.seqmax AS max_value,
284 ROUND(
285 100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
286 ) AS pct_used
287 FROM sequences seq
288 JOIN pg_catalog.pg_depend d ON d.objid = seq.seqrelid
289 AND d.deptype IN ('a', 'i')
290 AND d.classid = 'pg_class'::regclass
291 AND d.refclassid = 'pg_class'::regclass
292 JOIN pg_catalog.pg_attribute a ON a.attrelid = d.refobjid
293 AND a.attnum = d.refobjsubid
294 WHERE ROUND(
295 100.0 * COALESCE(seq.last_value, seq.start_value) / seq.seqmax, 2
296 ) > 50
297 ORDER BY pct_used DESC
298 """)
299 rows = cursor.fetchall()
300
301 items: list[CheckItem] = []
302 worst_pct = 0.0
303 for table_name, column_name, data_type, current_value, max_value, pct_used in rows:
304 pct = float(pct_used)
305 worst_pct = max(worst_pct, pct)
306 table_str = str(table_name)
307 source, package, model_class, model_file = _table_info(table_str, table_owners)
308 items.append(
309 CheckItem(
310 table=table_str,
311 name=f"{table_str}.{column_name}",
312 detail=f"{data_type}, {pct_used}% used ({current_value:,} / {max_value:,})",
313 source=source,
314 package=package,
315 model_class=model_class,
316 model_file=model_file,
317 suggestion=f'ALTER TABLE "{table_str}" ALTER COLUMN "{column_name}" SET DATA TYPE bigint;',
318 caveats=[],
319 )
320 )
321
322 if worst_pct >= 90:
323 status = "critical"
324 elif items:
325 status = "warning"
326 else:
327 status = "ok"
328
329 return CheckResult(
330 name="sequence_exhaustion",
331 label="Sequence exhaustion",
332 status=status,
333 summary=f"{worst_pct}% worst" if items else "all ok",
334 items=items,
335 message="",
336 tier="warning",
337 )