Plain is headed towards 1.0! Subscribe for development updates →

  1import inspect
  2from collections import defaultdict
  3from itertools import chain
  4
  5from plain.models.registry import models_registry
  6from plain.packages import packages_registry
  7from plain.preflight import Error, Warning, register_check
  8
  9
 10@register_check
 11def check_database_backends(database=False, **kwargs):
 12    if not database:
 13        return []
 14
 15    from plain.models.db import db_connection
 16
 17    return db_connection.validation.check(**kwargs)
 18
 19
 20@register_check
 21def check_all_models(package_configs=None, **kwargs):
 22    db_table_models = defaultdict(list)
 23    indexes = defaultdict(list)
 24    constraints = defaultdict(list)
 25    errors = []
 26    if package_configs is None:
 27        models = models_registry.get_models()
 28    else:
 29        models = chain.from_iterable(
 30            models_registry.get_models(package_label=package_config.package_label)
 31            for package_config in package_configs
 32        )
 33    for model in models:
 34        db_table_models[model._meta.db_table].append(model._meta.label)
 35        if not inspect.ismethod(model.check):
 36            errors.append(
 37                Error(
 38                    f"The '{model.__name__}.check()' class method is currently overridden by {model.check!r}.",
 39                    obj=model,
 40                    id="models.E020",
 41                )
 42            )
 43        else:
 44            errors.extend(model.check(**kwargs))
 45        for model_index in model._meta.indexes:
 46            indexes[model_index.name].append(model._meta.label)
 47        for model_constraint in model._meta.constraints:
 48            constraints[model_constraint.name].append(model._meta.label)
 49    for db_table, model_labels in db_table_models.items():
 50        if len(model_labels) != 1:
 51            model_labels_str = ", ".join(model_labels)
 52            errors.append(
 53                Error(
 54                    f"db_table '{db_table}' is used by multiple models: {model_labels_str}.",
 55                    obj=db_table,
 56                    id="models.E028",
 57                )
 58            )
 59    for index_name, model_labels in indexes.items():
 60        if len(model_labels) > 1:
 61            model_labels = set(model_labels)
 62            errors.append(
 63                Error(
 64                    "index name '{}' is not unique {} {}.".format(
 65                        index_name,
 66                        "for model" if len(model_labels) == 1 else "among models:",
 67                        ", ".join(sorted(model_labels)),
 68                    ),
 69                    id="models.E029" if len(model_labels) == 1 else "models.E030",
 70                ),
 71            )
 72    for constraint_name, model_labels in constraints.items():
 73        if len(model_labels) > 1:
 74            model_labels = set(model_labels)
 75            errors.append(
 76                Error(
 77                    "constraint name '{}' is not unique {} {}.".format(
 78                        constraint_name,
 79                        "for model" if len(model_labels) == 1 else "among models:",
 80                        ", ".join(sorted(model_labels)),
 81                    ),
 82                    id="models.E031" if len(model_labels) == 1 else "models.E032",
 83                ),
 84            )
 85    return errors
 86
 87
 88def _check_lazy_references(models_registry, packages_registry):
 89    """
 90    Ensure all lazy (i.e. string) model references have been resolved.
 91
 92    Lazy references are used in various places throughout Plain, primarily in
 93    related fields and model signals. Identify those common cases and provide
 94    more helpful error messages for them.
 95    """
 96    pending_models = set(models_registry._pending_operations)
 97
 98    # Short circuit if there aren't any errors.
 99    if not pending_models:
100        return []
101
102    def extract_operation(obj):
103        """
104        Take a callable found in Packages._pending_operations and identify the
105        original callable passed to Packages.lazy_model_operation(). If that
106        callable was a partial, return the inner, non-partial function and
107        any arguments and keyword arguments that were supplied with it.
108
109        obj is a callback defined locally in Packages.lazy_model_operation() and
110        annotated there with a `func` attribute so as to imitate a partial.
111        """
112        operation, args, keywords = obj, [], {}
113        while hasattr(operation, "func"):
114            args.extend(getattr(operation, "args", []))
115            keywords.update(getattr(operation, "keywords", {}))
116            operation = operation.func
117        return operation, args, keywords
118
119    def app_model_error(model_key):
120        try:
121            packages_registry.get_package_config(model_key[0])
122            model_error = "app '{}' doesn't provide model '{}'".format(*model_key)
123        except LookupError:
124            model_error = f"app '{model_key[0]}' isn't installed"
125        return model_error
126
127    # Here are several functions which return CheckMessage instances for the
128    # most common usages of lazy operations throughout Plain. These functions
129    # take the model that was being waited on as an (package_label, modelname)
130    # pair, the original lazy function, and its positional and keyword args as
131    # determined by extract_operation().
132
133    def field_error(model_key, func, args, keywords):
134        error_msg = (
135            "The field %(field)s was declared with a lazy reference "
136            "to '%(model)s', but %(model_error)s."
137        )
138        params = {
139            "model": ".".join(model_key),
140            "field": keywords["field"],
141            "model_error": app_model_error(model_key),
142        }
143        return Error(error_msg % params, obj=keywords["field"], id="fields.E307")
144
145    def default_error(model_key, func, args, keywords):
146        error_msg = (
147            "%(op)s contains a lazy reference to %(model)s, but %(model_error)s."
148        )
149        params = {
150            "op": func,
151            "model": ".".join(model_key),
152            "model_error": app_model_error(model_key),
153        }
154        return Error(error_msg % params, obj=func, id="models.E022")
155
156    # Maps common uses of lazy operations to corresponding error functions
157    # defined above. If a key maps to None, no error will be produced.
158    # default_error() will be used for usages that don't appear in this dict.
159    known_lazy = {
160        ("plain.models.fields.related", "resolve_related_class"): field_error,
161    }
162
163    def build_error(model_key, func, args, keywords):
164        key = (func.__module__, func.__name__)
165        error_fn = known_lazy.get(key, default_error)
166        return error_fn(model_key, func, args, keywords) if error_fn else None
167
168    return sorted(
169        filter(
170            None,
171            (
172                build_error(model_key, *extract_operation(func))
173                for model_key in pending_models
174                for func in models_registry._pending_operations[model_key]
175            ),
176        ),
177        key=lambda error: error.msg,
178    )
179
180
181@register_check
182def check_lazy_references(package_configs=None, **kwargs):
183    return _check_lazy_references(models_registry, packages_registry)
184
185
186@register_check
187def check_database_tables(package_configs, **kwargs):
188    from plain.models.db import db_connection
189
190    if not kwargs.get("database", False):
191        return []
192
193    errors = []
194
195    db_tables = db_connection.introspection.table_names()
196    model_tables = db_connection.introspection.plain_table_names()
197    unknown_tables = set(db_tables) - set(model_tables)
198    unknown_tables.discard("plainmigrations")  # Know this could be there
199    if unknown_tables:
200        table_names = ", ".join(unknown_tables)
201        specific_hint = f'echo "DROP TABLE IF EXISTS {unknown_tables.pop()}" | plain models db-shell'
202        errors.append(
203            Warning(
204                f"Unknown tables in default database: {table_names}",
205                hint=(
206                    "Tables may be from packages/models that have been uninstalled. "
207                    "Make sure you have a backup and delete the tables manually "
208                    f"(ex. `{specific_hint}`)."
209                ),
210                id="plain.models.W001",
211            )
212        )
213
214    return errors