v0.150.0
  1"""Preflight checks for index coverage on app models."""
  2
  3from __future__ import annotations
  4
  5from typing import Any
  6
  7from plain.packages import packages_registry
  8from plain.postgres.constraints import UniqueConstraint
  9from plain.postgres.expressions import F, OrderBy
 10from plain.postgres.fields.related import ForeignKeyField
 11from plain.postgres.query_utils import Q
 12from plain.postgres.registry import models_registry
 13from plain.preflight import PreflightCheck, PreflightResult, register_check
 14
 15
 16def _get_app_models() -> list[Any]:
 17    """Return models from the user's app packages only (not framework/third-party)."""
 18    app_models = []
 19    for package_config in packages_registry.get_package_configs():
 20        if package_config.name.startswith("app."):
 21            app_models.extend(
 22                models_registry.get_models(package_label=package_config.package_label)
 23            )
 24    return app_models
 25
 26
 27def _collect_model_indexes(model: Any) -> list[tuple[str, list[str], bool]]:
 28    """Collect (name, fields, is_unique) for non-partial indexes/constraints.
 29
 30    Partials are skipped: they only apply to rows matching their predicate,
 31    so they're never interchangeable with a full index for duplicate
 32    detection or reorder suggestions.
 33    """
 34    all_indexes: list[tuple[str, list[str], bool]] = []
 35
 36    for index in model.model_options.indexes:
 37        if index.fields and not index.is_partial:
 38            fields = [f.lstrip("-") for f in index.fields]
 39            all_indexes.append((index.name, fields, False))
 40
 41    for constraint in model.model_options.constraints:
 42        if (
 43            isinstance(constraint, UniqueConstraint)
 44            and constraint.fields
 45            and not constraint.is_partial
 46        ):
 47            all_indexes.append((constraint.name, list(constraint.fields), True))
 48
 49    return all_indexes
 50
 51
 52def _bare_column_name(expr: Any) -> str | None:
 53    """Return the column name if `expr` resolves to a bare column, else `None`.
 54
 55    Postgres can range-scan the leading column of an index for `WHERE col = ?`
 56    only when that column is a real attribute, not an expression — so a
 57    compound leading expression like `Lower("email")` returns `None` here.
 58    Sort direction (`F("col").desc()` / `OrderBy(F)`) doesn't affect equality
 59    lookups, so we unwrap one layer of `OrderBy` around a bare `F`.
 60    """
 61    if isinstance(expr, OrderBy):
 62        expr = expr.expression
 63    if isinstance(expr, F):
 64        return expr.name
 65    return None
 66
 67
 68def _leading_field_name(
 69    fields: tuple[str, ...] | list[str], expressions: tuple
 70) -> str | None:
 71    """The leading column's field name, or `None` if it's an expression."""
 72    if fields:
 73        return fields[0].lstrip("-")
 74    if expressions:
 75        return _bare_column_name(expressions[0])
 76    return None
 77
 78
 79def _condition_is_not_null_on(condition: Q, field_name: str) -> bool:
 80    """True if `condition` is exactly ``Q(<field_name>__isnull=False)``."""
 81    return (
 82        not condition.negated
 83        and len(condition.children) == 1
 84        and condition.children[0] == (f"{field_name}__isnull", False)
 85    )
 86
 87
 88def _fk_covered_field_names(model: Any) -> set[str]:
 89    """Field names that appear as the leading column of an index or unique
 90    constraint — covering arbitrary FK lookups via the index's leading
 91    column. Includes expression-based indexes/constraints whose leading
 92    expression is a bare ``F(field_name)``.
 93
 94    Partial indexes/constraints (declared with ``condition=Q(...)``) are
 95    excluded: Postgres can only use them for queries whose predicate
 96    implies the partial-index predicate, so an FK lookup or cascade
 97    delete that doesn't filter by that condition still does a sequential
 98    scan. The one exception is a predicate of exactly
 99    ``Q(<fk>__isnull=False)`` on the leading FK itself — every FK lookup
100    and referencing-side sweep is a ``WHERE fk = ?``, which implies
101    ``fk IS NOT NULL``, so Postgres can always use that partial. Match
102    the doctor's coverage rule in
103    ``introspection/health/checks_structural.py``.
104    """
105    covered: set[str] = set()
106
107    def _record(index_or_constraint: Any) -> None:
108        leading = _leading_field_name(
109            index_or_constraint.fields, index_or_constraint.expressions
110        )
111        if leading is None:
112            return
113        if not index_or_constraint.is_partial or _condition_is_not_null_on(
114            index_or_constraint.condition, leading
115        ):
116            covered.add(leading)
117
118    for index in model.model_options.indexes:
119        _record(index)
120
121    for constraint in model.model_options.constraints:
122        if isinstance(constraint, UniqueConstraint):
123            _record(constraint)
124
125    return covered
126
127
128def _composite_containing(model: Any, field_name: str) -> tuple[str, list[str]] | None:
129    """First non-partial index/constraint with `field_name` at a non-leading position.
130
131    A non-leading column doesn't cover the FK, but reordering the composite to
132    lead with it often can — worth suggesting in the fix message.
133    """
134    for name, fields, _is_unique in _collect_model_indexes(model):
135        if field_name in fields[1:]:
136            return name, fields
137    return None
138
139
140@register_check("postgres.missing_fk_indexes")
141class CheckMissingFKIndexes(PreflightCheck):
142    """Warns about foreign key fields without index coverage."""
143
144    def run(self) -> list[PreflightResult]:
145        results = []
146
147        for model in _get_app_models():
148            covered_fields = _fk_covered_field_names(model)
149
150            for field in model._model_meta.local_fields:
151                if (
152                    isinstance(field, ForeignKeyField)
153                    and not field.primary_key
154                    and field.name not in covered_fields
155                ):
156                    fix = (
157                        f"Foreign key '{field.name}' has no index coverage. "
158                        f"Add an Index on [\"{field.name}\"] or a constraint with '{field.name}' as the first field."
159                    )
160
161                    if composite := _composite_containing(model, field.name):
162                        composite_name, composite_fields = composite
163                        fix += (
164                            f" Alternatively, '{composite_name}' on [{', '.join(composite_fields)}] "
165                            f"already includes '{field.name}' — reordering it to put '{field.name}' first "
166                            f"covers this FK without a new index (safe when every query using it "
167                            f"filters all of its columns with equality)."
168                        )
169
170                    results.append(
171                        PreflightResult(
172                            fix=fix,
173                            obj=f"{model.model_options.label}.{field.name}",
174                            id="postgres.missing_fk_index",
175                            warning=True,
176                        )
177                    )
178
179        return results
180
181
182@register_check("postgres.duplicate_indexes")
183class CheckDuplicateIndexes(PreflightCheck):
184    """Warns about indexes redundant with other indexes or constraints.
185
186    Catches both prefix-redundancy (a 1-column index shadowed by a wider
187    composite) and exact-column duplicates (an `Index(fields=["x"])` that
188    duplicates a same-column `UniqueConstraint`).
189    """
190
191    def run(self) -> list[PreflightResult]:
192        results = []
193
194        for model in _get_app_models():
195            all_indexes = _collect_model_indexes(model)
196
197            flagged: set[str] = set()
198            for i, idx_a in enumerate(all_indexes):
199                for idx_b in all_indexes[i + 1 :]:
200                    for shorter, longer in [(idx_a, idx_b), (idx_b, idx_a)]:
201                        s_name, s_fields, s_unique = shorter
202                        l_name, l_fields, l_unique = longer
203
204                        if s_name in flagged:
205                            continue
206
207                        is_prefix_dup = (
208                            not s_unique
209                            and len(s_fields) < len(l_fields)
210                            and l_fields[: len(s_fields)] == s_fields
211                        )
212                        is_exact_dup = (
213                            s_fields == l_fields
214                            and not s_unique
215                            and (l_unique or s_name > l_name)
216                        )
217
218                        if not (is_prefix_dup or is_exact_dup):
219                            continue
220
221                        if is_prefix_dup:
222                            fix = (
223                                f"Index '{s_name}' on [{', '.join(s_fields)}] "
224                                f"is redundant with '{l_name}' on [{', '.join(l_fields)}]. "
225                                f"The longer index covers the same queries."
226                            )
227                        elif l_unique:
228                            fix = (
229                                f"Index '{s_name}' on [{', '.join(s_fields)}] "
230                                f"is redundant with '{l_name}' on the same columns. "
231                                f"The unique-backed index already covers these queries."
232                            )
233                        else:
234                            fix = (
235                                f"Index '{s_name}' on [{', '.join(s_fields)}] "
236                                f"is an exact duplicate of '{l_name}'. "
237                                f"Drop one of them."
238                            )
239
240                        results.append(
241                            PreflightResult(
242                                fix=fix,
243                                obj=model.model_options.label,
244                                id="postgres.duplicate_index",
245                                warning=True,
246                            )
247                        )
248                        flagged.add(s_name)
249
250        return results