1import inspect
2from collections import defaultdict
3from itertools import chain
4
5from plain.models.registry import models_registry
6from plain.packages import packages_registry
7from plain.preflight import Error, Warning, register_check
8
9
10@register_check
11def check_database_backends(database=False, **kwargs):
12 if not database:
13 return []
14
15 from plain.models.db import db_connection
16
17 return db_connection.validation.check(**kwargs)
18
19
20@register_check
21def check_all_models(package_configs=None, **kwargs):
22 db_table_models = defaultdict(list)
23 indexes = defaultdict(list)
24 constraints = defaultdict(list)
25 errors = []
26 if package_configs is None:
27 models = models_registry.get_models()
28 else:
29 models = chain.from_iterable(
30 models_registry.get_models(package_label=package_config.package_label)
31 for package_config in package_configs
32 )
33 for model in models:
34 db_table_models[model._meta.db_table].append(model._meta.label)
35 if not inspect.ismethod(model.check):
36 errors.append(
37 Error(
38 f"The '{model.__name__}.check()' class method is currently overridden by {model.check!r}.",
39 obj=model,
40 id="models.E020",
41 )
42 )
43 else:
44 errors.extend(model.check(**kwargs))
45 for model_index in model._meta.indexes:
46 indexes[model_index.name].append(model._meta.label)
47 for model_constraint in model._meta.constraints:
48 constraints[model_constraint.name].append(model._meta.label)
49 for db_table, model_labels in db_table_models.items():
50 if len(model_labels) != 1:
51 model_labels_str = ", ".join(model_labels)
52 errors.append(
53 Error(
54 f"db_table '{db_table}' is used by multiple models: {model_labels_str}.",
55 obj=db_table,
56 id="models.E028",
57 )
58 )
59 for index_name, model_labels in indexes.items():
60 if len(model_labels) > 1:
61 model_labels = set(model_labels)
62 errors.append(
63 Error(
64 "index name '{}' is not unique {} {}.".format(
65 index_name,
66 "for model" if len(model_labels) == 1 else "among models:",
67 ", ".join(sorted(model_labels)),
68 ),
69 id="models.E029" if len(model_labels) == 1 else "models.E030",
70 ),
71 )
72 for constraint_name, model_labels in constraints.items():
73 if len(model_labels) > 1:
74 model_labels = set(model_labels)
75 errors.append(
76 Error(
77 "constraint name '{}' is not unique {} {}.".format(
78 constraint_name,
79 "for model" if len(model_labels) == 1 else "among models:",
80 ", ".join(sorted(model_labels)),
81 ),
82 id="models.E031" if len(model_labels) == 1 else "models.E032",
83 ),
84 )
85 return errors
86
87
88def _check_lazy_references(models_registry, packages_registry):
89 """
90 Ensure all lazy (i.e. string) model references have been resolved.
91
92 Lazy references are used in various places throughout Plain, primarily in
93 related fields and model signals. Identify those common cases and provide
94 more helpful error messages for them.
95 """
96 pending_models = set(models_registry._pending_operations)
97
98 # Short circuit if there aren't any errors.
99 if not pending_models:
100 return []
101
102 def extract_operation(obj):
103 """
104 Take a callable found in Packages._pending_operations and identify the
105 original callable passed to Packages.lazy_model_operation(). If that
106 callable was a partial, return the inner, non-partial function and
107 any arguments and keyword arguments that were supplied with it.
108
109 obj is a callback defined locally in Packages.lazy_model_operation() and
110 annotated there with a `func` attribute so as to imitate a partial.
111 """
112 operation, args, keywords = obj, [], {}
113 while hasattr(operation, "func"):
114 args.extend(getattr(operation, "args", []))
115 keywords.update(getattr(operation, "keywords", {}))
116 operation = operation.func
117 return operation, args, keywords
118
119 def app_model_error(model_key):
120 try:
121 packages_registry.get_package_config(model_key[0])
122 model_error = "app '{}' doesn't provide model '{}'".format(*model_key)
123 except LookupError:
124 model_error = f"app '{model_key[0]}' isn't installed"
125 return model_error
126
127 # Here are several functions which return CheckMessage instances for the
128 # most common usages of lazy operations throughout Plain. These functions
129 # take the model that was being waited on as an (package_label, modelname)
130 # pair, the original lazy function, and its positional and keyword args as
131 # determined by extract_operation().
132
133 def field_error(model_key, func, args, keywords):
134 error_msg = (
135 "The field %(field)s was declared with a lazy reference "
136 "to '%(model)s', but %(model_error)s."
137 )
138 params = {
139 "model": ".".join(model_key),
140 "field": keywords["field"],
141 "model_error": app_model_error(model_key),
142 }
143 return Error(error_msg % params, obj=keywords["field"], id="fields.E307")
144
145 def default_error(model_key, func, args, keywords):
146 error_msg = (
147 "%(op)s contains a lazy reference to %(model)s, but %(model_error)s."
148 )
149 params = {
150 "op": func,
151 "model": ".".join(model_key),
152 "model_error": app_model_error(model_key),
153 }
154 return Error(error_msg % params, obj=func, id="models.E022")
155
156 # Maps common uses of lazy operations to corresponding error functions
157 # defined above. If a key maps to None, no error will be produced.
158 # default_error() will be used for usages that don't appear in this dict.
159 known_lazy = {
160 ("plain.models.fields.related", "resolve_related_class"): field_error,
161 }
162
163 def build_error(model_key, func, args, keywords):
164 key = (func.__module__, func.__name__)
165 error_fn = known_lazy.get(key, default_error)
166 return error_fn(model_key, func, args, keywords) if error_fn else None
167
168 return sorted(
169 filter(
170 None,
171 (
172 build_error(model_key, *extract_operation(func))
173 for model_key in pending_models
174 for func in models_registry._pending_operations[model_key]
175 ),
176 ),
177 key=lambda error: error.msg,
178 )
179
180
181@register_check
182def check_lazy_references(package_configs=None, **kwargs):
183 return _check_lazy_references(models_registry, packages_registry)
184
185
186@register_check
187def check_database_tables(package_configs, **kwargs):
188 from plain.models.db import db_connection
189
190 if not kwargs.get("database", False):
191 return []
192
193 errors = []
194
195 db_tables = db_connection.introspection.table_names()
196 model_tables = db_connection.introspection.plain_table_names()
197 unknown_tables = set(db_tables) - set(model_tables)
198 unknown_tables.discard("plainmigrations") # Know this could be there
199 if unknown_tables:
200 table_names = ", ".join(unknown_tables)
201 specific_hint = f'echo "DROP TABLE IF EXISTS {unknown_tables.pop()}" | plain models db-shell'
202 errors.append(
203 Warning(
204 f"Unknown tables in default database: {table_names}",
205 hint=(
206 "Tables may be from packages/models that have been uninstalled. "
207 "Make sure you have a backup and delete the tables manually "
208 f"(ex. `{specific_hint}`)."
209 ),
210 id="plain.models.W001",
211 )
212 )
213
214 return errors