1"""Preflight checks on model definitions."""
2
3from __future__ import annotations
4
5import inspect
6from collections import defaultdict
7from collections.abc import Callable
8from typing import Any
9
10from plain.packages import packages_registry
11from plain.postgres.registry import ModelsRegistry, models_registry
12from plain.preflight import PreflightCheck, PreflightResult, register_check
13
14
15@register_check("postgres.all_models")
16class CheckAllModels(PreflightCheck):
17 """Validates all model definitions for common issues."""
18
19 def run(self) -> list[PreflightResult]:
20 db_table_models = defaultdict(list)
21 # Indexes and constraints share the same Postgres namespace,
22 # so track them together to catch cross-type collisions.
23 relation_names = defaultdict(list)
24 errors = []
25 models = models_registry.get_models()
26 for model in models:
27 db_table_models[model.model_options.db_table].append(
28 model.model_options.label
29 )
30 if not inspect.ismethod(model.preflight):
31 errors.append(
32 PreflightResult(
33 fix=f"The '{model.__name__}.preflight()' class method is currently overridden by {model.preflight!r}.",
34 obj=model,
35 id="postgres.preflight_method_overridden",
36 )
37 )
38 else:
39 errors.extend(model.preflight())
40 for model_index in model.model_options.indexes:
41 relation_names[model_index.name].append(model.model_options.label)
42 for model_constraint in model.model_options.constraints:
43 relation_names[model_constraint.name].append(model.model_options.label)
44 for db_table, model_labels in db_table_models.items():
45 if len(model_labels) != 1:
46 model_labels_str = ", ".join(model_labels)
47 errors.append(
48 PreflightResult(
49 fix=f"db_table '{db_table}' is used by multiple models: {model_labels_str}.",
50 obj=db_table,
51 id="postgres.duplicate_db_table",
52 )
53 )
54 for relation_name, model_labels in relation_names.items():
55 if len(model_labels) > 1:
56 unique_models = set(model_labels)
57 single_model = len(unique_models) == 1
58 errors.append(
59 PreflightResult(
60 fix="index/constraint name '{}' is not unique {} {}.".format(
61 relation_name,
62 "for model" if single_model else "among models:",
63 ", ".join(sorted(unique_models)),
64 ),
65 id="postgres.relation_name_not_unique_single"
66 if single_model
67 else "postgres.relation_name_not_unique_multiple",
68 ),
69 )
70 return errors
71
72
73def _check_lazy_references(
74 models_registry: ModelsRegistry, packages_registry: Any
75) -> list[PreflightResult]:
76 """
77 Ensure all lazy (i.e. string) model references have been resolved.
78
79 Lazy references are used in various places throughout Plain, primarily in
80 related fields and model signals. Identify those common cases and provide
81 more helpful error messages for them.
82 """
83 pending_models = set(models_registry._pending_operations)
84
85 # Short circuit if there aren't any errors.
86 if not pending_models:
87 return []
88
89 def extract_operation(
90 obj: Any,
91 ) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
92 """
93 Take a callable found in Packages._pending_operations and identify the
94 original callable passed to Packages.lazy_model_operation(). If that
95 callable was a partial, return the inner, non-partial function and
96 any arguments and keyword arguments that were supplied with it.
97
98 obj is a callback defined locally in Packages.lazy_model_operation() and
99 annotated there with a `func` attribute so as to imitate a partial.
100 """
101 operation, args, keywords = obj, [], {}
102 while hasattr(operation, "func"):
103 args.extend(getattr(operation, "args", []))
104 keywords.update(getattr(operation, "keywords", {}))
105 operation = operation.func
106 return operation, args, keywords
107
108 def app_model_error(model_key: tuple[str, str]) -> str:
109 try:
110 packages_registry.get_package_config(model_key[0])
111 model_error = "app '{}' doesn't provide model '{}'".format(*model_key)
112 except LookupError:
113 model_error = f"app '{model_key[0]}' isn't installed"
114 return model_error
115
116 # Here are several functions which return CheckMessage instances for the
117 # most common usages of lazy operations throughout Plain. These functions
118 # take the model that was being waited on as an (package_label, modelname)
119 # pair, the original lazy function, and its positional and keyword args as
120 # determined by extract_operation().
121
122 def field_error(
123 model_key: tuple[str, str],
124 func: Callable[..., Any],
125 args: list[Any],
126 keywords: dict[str, Any],
127 ) -> PreflightResult:
128 error_msg = (
129 "The field %(field)s was declared with a lazy reference "
130 "to '%(model)s', but %(model_error)s."
131 )
132 params = {
133 "model": ".".join(model_key),
134 "field": keywords["field"],
135 "model_error": app_model_error(model_key),
136 }
137 return PreflightResult(
138 fix=error_msg % params,
139 obj=keywords["field"],
140 id="fields.lazy_reference_not_resolvable",
141 )
142
143 def default_error(
144 model_key: tuple[str, str],
145 func: Callable[..., Any],
146 args: list[Any],
147 keywords: dict[str, Any],
148 ) -> PreflightResult:
149 error_msg = (
150 "%(op)s contains a lazy reference to %(model)s, but %(model_error)s."
151 )
152 params = {
153 "op": func,
154 "model": ".".join(model_key),
155 "model_error": app_model_error(model_key),
156 }
157 return PreflightResult(
158 fix=error_msg % params,
159 obj=func,
160 id="postgres.lazy_reference_resolution_failed",
161 )
162
163 # Maps common uses of lazy operations to corresponding error functions
164 # defined above. If a key maps to None, no error will be produced.
165 # default_error() will be used for usages that don't appear in this dict.
166 known_lazy = {
167 ("plain.postgres.fields.related", "resolve_related_class"): field_error,
168 }
169
170 def build_error(
171 model_key: tuple[str, str],
172 func: Callable[..., Any],
173 args: list[Any],
174 keywords: dict[str, Any],
175 ) -> PreflightResult | None:
176 key = (func.__module__, func.__name__) # ty: ignore[unresolved-attribute]
177 error_fn = known_lazy.get(key, default_error)
178 return error_fn(model_key, func, args, keywords) if error_fn else None
179
180 return sorted(
181 filter(
182 None,
183 (
184 build_error(model_key, *extract_operation(func))
185 for model_key in pending_models
186 for func in models_registry._pending_operations[model_key]
187 ),
188 ),
189 key=lambda error: error.fix,
190 )
191
192
193@register_check("postgres.lazy_references")
194class CheckLazyReferences(PreflightCheck):
195 """Ensures all lazy (string) model references have been resolved."""
196
197 def run(self) -> list[PreflightResult]:
198 return _check_lazy_references(models_registry, packages_registry)