1"""Preflight checks for index coverage on app models."""
2
3from __future__ import annotations
4
5from typing import Any
6
7from plain.packages import packages_registry
8from plain.postgres.constraints import UniqueConstraint
9from plain.postgres.expressions import F, OrderBy
10from plain.postgres.fields.related import ForeignKeyField
11from plain.postgres.query_utils import Q
12from plain.postgres.registry import models_registry
13from plain.preflight import PreflightCheck, PreflightResult, register_check
14
15
16def _get_app_models() -> list[Any]:
17 """Return models from the user's app packages only (not framework/third-party)."""
18 app_models = []
19 for package_config in packages_registry.get_package_configs():
20 if package_config.name.startswith("app."):
21 app_models.extend(
22 models_registry.get_models(package_label=package_config.package_label)
23 )
24 return app_models
25
26
27def _collect_model_indexes(model: Any) -> list[tuple[str, list[str], bool]]:
28 """Collect (name, fields, is_unique) for non-partial indexes/constraints.
29
30 Partials are skipped: they only apply to rows matching their predicate,
31 so they're never interchangeable with a full index for duplicate
32 detection or reorder suggestions.
33 """
34 all_indexes: list[tuple[str, list[str], bool]] = []
35
36 for index in model.model_options.indexes:
37 if index.fields and not index.is_partial:
38 fields = [f.lstrip("-") for f in index.fields]
39 all_indexes.append((index.name, fields, False))
40
41 for constraint in model.model_options.constraints:
42 if (
43 isinstance(constraint, UniqueConstraint)
44 and constraint.fields
45 and not constraint.is_partial
46 ):
47 all_indexes.append((constraint.name, list(constraint.fields), True))
48
49 return all_indexes
50
51
52def _bare_column_name(expr: Any) -> str | None:
53 """Return the column name if `expr` resolves to a bare column, else `None`.
54
55 Postgres can range-scan the leading column of an index for `WHERE col = ?`
56 only when that column is a real attribute, not an expression — so a
57 compound leading expression like `Lower("email")` returns `None` here.
58 Sort direction (`F("col").desc()` / `OrderBy(F)`) doesn't affect equality
59 lookups, so we unwrap one layer of `OrderBy` around a bare `F`.
60 """
61 if isinstance(expr, OrderBy):
62 expr = expr.expression
63 if isinstance(expr, F):
64 return expr.name
65 return None
66
67
68def _leading_field_name(
69 fields: tuple[str, ...] | list[str], expressions: tuple
70) -> str | None:
71 """The leading column's field name, or `None` if it's an expression."""
72 if fields:
73 return fields[0].lstrip("-")
74 if expressions:
75 return _bare_column_name(expressions[0])
76 return None
77
78
79def _condition_is_not_null_on(condition: Q, field_name: str) -> bool:
80 """True if `condition` is exactly ``Q(<field_name>__isnull=False)``."""
81 return (
82 not condition.negated
83 and len(condition.children) == 1
84 and condition.children[0] == (f"{field_name}__isnull", False)
85 )
86
87
88def _fk_covered_field_names(model: Any) -> set[str]:
89 """Field names that appear as the leading column of an index or unique
90 constraint — covering arbitrary FK lookups via the index's leading
91 column. Includes expression-based indexes/constraints whose leading
92 expression is a bare ``F(field_name)``.
93
94 Partial indexes/constraints (declared with ``condition=Q(...)``) are
95 excluded: Postgres can only use them for queries whose predicate
96 implies the partial-index predicate, so an FK lookup or cascade
97 delete that doesn't filter by that condition still does a sequential
98 scan. The one exception is a predicate of exactly
99 ``Q(<fk>__isnull=False)`` on the leading FK itself — every FK lookup
100 and referencing-side sweep is a ``WHERE fk = ?``, which implies
101 ``fk IS NOT NULL``, so Postgres can always use that partial. Match
102 the doctor's coverage rule in
103 ``introspection/health/checks_structural.py``.
104 """
105 covered: set[str] = set()
106
107 def _record(index_or_constraint: Any) -> None:
108 leading = _leading_field_name(
109 index_or_constraint.fields, index_or_constraint.expressions
110 )
111 if leading is None:
112 return
113 if not index_or_constraint.is_partial or _condition_is_not_null_on(
114 index_or_constraint.condition, leading
115 ):
116 covered.add(leading)
117
118 for index in model.model_options.indexes:
119 _record(index)
120
121 for constraint in model.model_options.constraints:
122 if isinstance(constraint, UniqueConstraint):
123 _record(constraint)
124
125 return covered
126
127
128def _composite_containing(model: Any, field_name: str) -> tuple[str, list[str]] | None:
129 """First non-partial index/constraint with `field_name` at a non-leading position.
130
131 A non-leading column doesn't cover the FK, but reordering the composite to
132 lead with it often can — worth suggesting in the fix message.
133 """
134 for name, fields, _is_unique in _collect_model_indexes(model):
135 if field_name in fields[1:]:
136 return name, fields
137 return None
138
139
140@register_check("postgres.missing_fk_indexes")
141class CheckMissingFKIndexes(PreflightCheck):
142 """Warns about foreign key fields without index coverage."""
143
144 def run(self) -> list[PreflightResult]:
145 results = []
146
147 for model in _get_app_models():
148 covered_fields = _fk_covered_field_names(model)
149
150 for field in model._model_meta.local_fields:
151 if (
152 isinstance(field, ForeignKeyField)
153 and not field.primary_key
154 and field.name not in covered_fields
155 ):
156 fix = (
157 f"Foreign key '{field.name}' has no index coverage. "
158 f"Add an Index on [\"{field.name}\"] or a constraint with '{field.name}' as the first field."
159 )
160
161 if composite := _composite_containing(model, field.name):
162 composite_name, composite_fields = composite
163 fix += (
164 f" Alternatively, '{composite_name}' on [{', '.join(composite_fields)}] "
165 f"already includes '{field.name}' — reordering it to put '{field.name}' first "
166 f"covers this FK without a new index (safe when every query using it "
167 f"filters all of its columns with equality)."
168 )
169
170 results.append(
171 PreflightResult(
172 fix=fix,
173 obj=f"{model.model_options.label}.{field.name}",
174 id="postgres.missing_fk_index",
175 warning=True,
176 )
177 )
178
179 return results
180
181
182@register_check("postgres.duplicate_indexes")
183class CheckDuplicateIndexes(PreflightCheck):
184 """Warns about indexes redundant with other indexes or constraints.
185
186 Catches both prefix-redundancy (a 1-column index shadowed by a wider
187 composite) and exact-column duplicates (an `Index(fields=["x"])` that
188 duplicates a same-column `UniqueConstraint`).
189 """
190
191 def run(self) -> list[PreflightResult]:
192 results = []
193
194 for model in _get_app_models():
195 all_indexes = _collect_model_indexes(model)
196
197 flagged: set[str] = set()
198 for i, idx_a in enumerate(all_indexes):
199 for idx_b in all_indexes[i + 1 :]:
200 for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
201 s_name, s_fields, s_unique = shorter
202 l_name, l_fields, l_unique = longer
203
204 if s_name in flagged:
205 continue
206
207 is_prefix_dup = (
208 not s_unique
209 and len(s_fields) < len(l_fields)
210 and l_fields[: len(s_fields)] == s_fields
211 )
212 is_exact_dup = (
213 s_fields == l_fields
214 and not s_unique
215 and (l_unique or s_name > l_name)
216 )
217
218 if not (is_prefix_dup or is_exact_dup):
219 continue
220
221 if is_prefix_dup:
222 fix = (
223 f"Index '{s_name}' on [{', '.join(s_fields)}] "
224 f"is redundant with '{l_name}' on [{', '.join(l_fields)}]. "
225 f"The longer index covers the same queries."
226 )
227 elif l_unique:
228 fix = (
229 f"Index '{s_name}' on [{', '.join(s_fields)}] "
230 f"is redundant with '{l_name}' on the same columns. "
231 f"The unique-backed index already covers these queries."
232 )
233 else:
234 fix = (
235 f"Index '{s_name}' on [{', '.join(s_fields)}] "
236 f"is an exact duplicate of '{l_name}'. "
237 f"Drop one of them."
238 )
239
240 results.append(
241 PreflightResult(
242 fix=fix,
243 obj=model.model_options.label,
244 id="postgres.duplicate_index",
245 warning=True,
246 )
247 )
248 flagged.add(s_name)
249
250 return results