v0.150.0
  1"""Preflight checks on model definitions."""
  2
  3from __future__ import annotations
  4
  5import inspect
  6from collections import defaultdict
  7from collections.abc import Callable
  8from typing import Any
  9
 10from plain.packages import packages_registry
 11from plain.postgres.registry import ModelsRegistry, models_registry
 12from plain.preflight import PreflightCheck, PreflightResult, register_check
 13
 14
 15@register_check("postgres.all_models")
 16class CheckAllModels(PreflightCheck):
 17    """Validates all model definitions for common issues."""
 18
 19    def run(self) -> list[PreflightResult]:
 20        db_table_models = defaultdict(list)
 21        # Indexes and constraints share the same Postgres namespace,
 22        # so track them together to catch cross-type collisions.
 23        relation_names = defaultdict(list)
 24        errors = []
 25        models = models_registry.get_models()
 26        for model in models:
 27            db_table_models[model.model_options.db_table].append(
 28                model.model_options.label
 29            )
 30            if not inspect.ismethod(model.preflight):
 31                errors.append(
 32                    PreflightResult(
 33                        fix=f"The '{model.__name__}.preflight()' class method is currently overridden by {model.preflight!r}.",
 34                        obj=model,
 35                        id="postgres.preflight_method_overridden",
 36                    )
 37                )
 38            else:
 39                errors.extend(model.preflight())
 40            for model_index in model.model_options.indexes:
 41                relation_names[model_index.name].append(model.model_options.label)
 42            for model_constraint in model.model_options.constraints:
 43                relation_names[model_constraint.name].append(model.model_options.label)
 44        for db_table, model_labels in db_table_models.items():
 45            if len(model_labels) != 1:
 46                model_labels_str = ", ".join(model_labels)
 47                errors.append(
 48                    PreflightResult(
 49                        fix=f"db_table '{db_table}' is used by multiple models: {model_labels_str}.",
 50                        obj=db_table,
 51                        id="postgres.duplicate_db_table",
 52                    )
 53                )
 54        for relation_name, model_labels in relation_names.items():
 55            if len(model_labels) > 1:
 56                unique_models = set(model_labels)
 57                single_model = len(unique_models) == 1
 58                errors.append(
 59                    PreflightResult(
 60                        fix="index/constraint name '{}' is not unique {} {}.".format(
 61                            relation_name,
 62                            "for model" if single_model else "among models:",
 63                            ", ".join(sorted(unique_models)),
 64                        ),
 65                        id="postgres.relation_name_not_unique_single"
 66                        if single_model
 67                        else "postgres.relation_name_not_unique_multiple",
 68                    ),
 69                )
 70        return errors
 71
 72
 73def _check_lazy_references(
 74    models_registry: ModelsRegistry, packages_registry: Any
 75) -> list[PreflightResult]:
 76    """
 77    Ensure all lazy (i.e. string) model references have been resolved.
 78
 79    Lazy references are used in various places throughout Plain, primarily in
 80    related fields and model signals. Identify those common cases and provide
 81    more helpful error messages for them.
 82    """
 83    pending_models = set(models_registry._pending_operations)
 84
 85    # Short circuit if there aren't any errors.
 86    if not pending_models:
 87        return []
 88
 89    def extract_operation(
 90        obj: Any,
 91    ) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
 92        """
 93        Take a callable found in Packages._pending_operations and identify the
 94        original callable passed to Packages.lazy_model_operation(). If that
 95        callable was a partial, return the inner, non-partial function and
 96        any arguments and keyword arguments that were supplied with it.
 97
 98        obj is a callback defined locally in Packages.lazy_model_operation() and
 99        annotated there with a `func` attribute so as to imitate a partial.
100        """
101        operation, args, keywords = obj, [], {}
102        while hasattr(operation, "func"):
103            args.extend(getattr(operation, "args", []))
104            keywords.update(getattr(operation, "keywords", {}))
105            operation = operation.func
106        return operation, args, keywords
107
108    def app_model_error(model_key: tuple[str, str]) -> str:
109        try:
110            packages_registry.get_package_config(model_key[0])
111            model_error = "app '{}' doesn't provide model '{}'".format(*model_key)
112        except LookupError:
113            model_error = f"app '{model_key[0]}' isn't installed"
114        return model_error
115
116    # Here are several functions which return CheckMessage instances for the
117    # most common usages of lazy operations throughout Plain. These functions
118    # take the model that was being waited on as an (package_label, modelname)
119    # pair, the original lazy function, and its positional and keyword args as
120    # determined by extract_operation().
121
122    def field_error(
123        model_key: tuple[str, str],
124        func: Callable[..., Any],
125        args: list[Any],
126        keywords: dict[str, Any],
127    ) -> PreflightResult:
128        error_msg = (
129            "The field %(field)s was declared with a lazy reference "
130            "to '%(model)s', but %(model_error)s."
131        )
132        params = {
133            "model": ".".join(model_key),
134            "field": keywords["field"],
135            "model_error": app_model_error(model_key),
136        }
137        return PreflightResult(
138            fix=error_msg % params,
139            obj=keywords["field"],
140            id="fields.lazy_reference_not_resolvable",
141        )
142
143    def default_error(
144        model_key: tuple[str, str],
145        func: Callable[..., Any],
146        args: list[Any],
147        keywords: dict[str, Any],
148    ) -> PreflightResult:
149        error_msg = (
150            "%(op)s contains a lazy reference to %(model)s, but %(model_error)s."
151        )
152        params = {
153            "op": func,
154            "model": ".".join(model_key),
155            "model_error": app_model_error(model_key),
156        }
157        return PreflightResult(
158            fix=error_msg % params,
159            obj=func,
160            id="postgres.lazy_reference_resolution_failed",
161        )
162
163    # Maps common uses of lazy operations to corresponding error functions
164    # defined above. If a key maps to None, no error will be produced.
165    # default_error() will be used for usages that don't appear in this dict.
166    known_lazy = {
167        ("plain.postgres.fields.related", "resolve_related_class"): field_error,
168    }
169
170    def build_error(
171        model_key: tuple[str, str],
172        func: Callable[..., Any],
173        args: list[Any],
174        keywords: dict[str, Any],
175    ) -> PreflightResult | None:
176        key = (func.__module__, func.__name__)  # ty: ignore[unresolved-attribute]
177        error_fn = known_lazy.get(key, default_error)
178        return error_fn(model_key, func, args, keywords) if error_fn else None
179
180    return sorted(
181        filter(
182            None,
183            (
184                build_error(model_key, *extract_operation(func))
185                for model_key in pending_models
186                for func in models_registry._pending_operations[model_key]
187            ),
188        ),
189        key=lambda error: error.fix,
190    )
191
192
193@register_check("postgres.lazy_references")
194class CheckLazyReferences(PreflightCheck):
195    """Ensures all lazy (string) model references have been resolved."""
196
197    def run(self) -> list[PreflightResult]:
198        return _check_lazy_references(models_registry, packages_registry)