Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import pkgutil
  4import sys
  5from importlib import import_module, reload
  6from typing import TYPE_CHECKING, Any
  7
  8from plain.models.connections import DatabaseConnection
  9from plain.models.migrations.graph import MigrationGraph
 10from plain.models.migrations.recorder import MigrationRecorder
 11from plain.packages import packages_registry
 12
 13from .exceptions import (
 14    AmbiguityError,
 15    BadMigrationError,
 16    InconsistentMigrationHistory,
 17    NodeNotFoundError,
 18)
 19
 20if TYPE_CHECKING:
 21    from plain.models.backends.base.base import BaseDatabaseWrapper
 22    from plain.models.migrations.migration import Migration
 23
 24MIGRATIONS_MODULE_NAME = "migrations"
 25
 26
 27class MigrationLoader:
 28    """
 29    Load migration files from disk and their status from the database.
 30
 31    Migration files are expected to live in the "migrations" directory of
 32    an app. Their names are entirely unimportant from a code perspective,
 33    but will probably follow the 1234_name.py convention.
 34
 35    On initialization, this class will scan those directories, and open and
 36    read the Python files, looking for a class called Migration, which should
 37    inherit from plain.models.migrations.Migration. See
 38    plain.models.migrations.migration for what that looks like.
 39
 40    Some migrations will be marked as "replacing" another set of migrations.
 41    These are loaded into a separate set of migrations away from the main ones.
 42    If all the migrations they replace are either unapplied or missing from
 43    disk, then they are injected into the main set, replacing the named migrations.
 44    Any dependency pointers to the replaced migrations are re-pointed to the
 45    new migration.
 46
 47    This does mean that this class MUST also talk to the database as well as
 48    to disk, but this is probably fine. We're already not just operating
 49    in memory.
 50    """
 51
 52    def __init__(
 53        self,
 54        connection: BaseDatabaseWrapper | DatabaseConnection | None,
 55        load: bool = True,
 56        ignore_no_migrations: bool = False,
 57        replace_migrations: bool = True,
 58    ):
 59        self.connection = connection
 60        self.disk_migrations: dict[tuple[str, str], Migration] | None = None
 61        self.applied_migrations: dict[tuple[str, str], Any] | None = None
 62        self.ignore_no_migrations = ignore_no_migrations
 63        self.replace_migrations = replace_migrations
 64        self.unmigrated_packages: set[str]
 65        self.migrated_packages: set[str]
 66        self.graph: MigrationGraph
 67        self.replacements: dict[tuple[str, str], Migration]
 68        if load:
 69            self.build_graph()
 70
 71    @classmethod
 72    def migrations_module(cls, package_label: str) -> tuple[str | None, bool]:
 73        """
 74        Return the path to the migrations module for the specified package_label
 75        and a boolean indicating if the module is specified in
 76        settings.MIGRATION_MODULE.
 77        """
 78
 79        # This package (plain-models) has different code under migrations/
 80        if package_label == "plainmodels":
 81            return None, True
 82
 83        app = packages_registry.get_package_config(package_label)
 84        return f"{app.name}.{MIGRATIONS_MODULE_NAME}", False
 85
 86    def load_disk(self) -> None:
 87        """Load the migrations from all INSTALLED_PACKAGES from disk."""
 88        self.disk_migrations = {}
 89        self.unmigrated_packages = set()
 90        self.migrated_packages = set()
 91        for package_config in packages_registry.get_package_configs():
 92            # Get the migrations module directory
 93            module_name, explicit = self.migrations_module(package_config.package_label)
 94            if module_name is None:
 95                self.unmigrated_packages.add(package_config.package_label)
 96                continue
 97            was_loaded = module_name in sys.modules
 98            try:
 99                module = import_module(module_name)
100            except ModuleNotFoundError as e:
101                if (explicit and self.ignore_no_migrations) or (
102                    not explicit
103                    and e.name is not None
104                    and MIGRATIONS_MODULE_NAME in e.name.split(".")
105                ):
106                    self.unmigrated_packages.add(package_config.package_label)
107                    continue
108                raise
109            else:
110                # Module is not a package (e.g. migrations.py).
111                if not hasattr(module, "__path__"):
112                    self.unmigrated_packages.add(package_config.package_label)
113                    continue
114                # Empty directories are namespaces. Namespace packages have no
115                # __file__ and don't use a list for __path__. See
116                # https://docs.python.org/3/reference/import.html#namespace-packages
117                if getattr(module, "__file__", None) is None and not isinstance(
118                    module.__path__, list
119                ):
120                    self.unmigrated_packages.add(package_config.package_label)
121                    continue
122                # Force a reload if it's already loaded (tests need this)
123                if was_loaded:
124                    reload(module)
125            self.migrated_packages.add(package_config.package_label)
126            migration_names = {
127                name
128                for _, name, is_pkg in pkgutil.iter_modules(module.__path__)
129                if not is_pkg and name[0] not in "_~"
130            }
131            # Load migrations
132            for migration_name in migration_names:
133                migration_path = f"{module_name}.{migration_name}"
134                try:
135                    migration_module = import_module(migration_path)
136                except ImportError as e:
137                    if "bad magic number" in str(e):
138                        raise ImportError(
139                            f"Couldn't import {migration_path!r} as it appears to be a stale "
140                            ".pyc file."
141                        ) from e
142                    else:
143                        raise
144                if not hasattr(migration_module, "Migration"):
145                    raise BadMigrationError(
146                        f"Migration {migration_name} in app {package_config.package_label} has no Migration class"
147                    )
148                self.disk_migrations[package_config.package_label, migration_name] = (
149                    migration_module.Migration(  # type: ignore[call-non-callable]
150                        migration_name,
151                        package_config.package_label,
152                    )
153                )
154
155    def get_migration(self, package_label: str, name_prefix: str) -> Migration | None:
156        """Return the named migration or raise NodeNotFoundError."""
157        return self.graph.nodes[package_label, name_prefix]
158
159    def get_migration_by_prefix(
160        self, package_label: str, name_prefix: str
161    ) -> Migration:
162        """
163        Return the migration(s) which match the given app label and name_prefix.
164        """
165        # Do the search
166        results = []
167        for migration_package_label, migration_name in self.disk_migrations:
168            if migration_package_label == package_label and migration_name.startswith(
169                name_prefix
170            ):
171                results.append((migration_package_label, migration_name))
172        if len(results) > 1:
173            raise AmbiguityError(
174                f"There is more than one migration for '{package_label}' with the prefix '{name_prefix}'"
175            )
176        elif not results:
177            raise KeyError(
178                f"There is no migration for '{package_label}' with the prefix "
179                f"'{name_prefix}'"
180            )
181        else:
182            return self.disk_migrations[results[0]]
183
184    def check_key(
185        self, key: tuple[str, str], current_package: str
186    ) -> tuple[str, str] | None:
187        if (key[1] != "__first__" and key[1] != "__latest__") or key in self.graph:
188            return key
189        # Special-case __first__, which means "the first migration" for
190        # migrated packages, and is ignored for unmigrated packages. It allows
191        # makemigrations to declare dependencies on packages before they even have
192        # migrations.
193        if key[0] == current_package:
194            # Ignore __first__ references to the same app (#22325)
195            return None
196        if key[0] in self.unmigrated_packages:
197            # This app isn't migrated, but something depends on it.
198            # The models will get auto-added into the state, though
199            # so we're fine.
200            return None
201        if key[0] in self.migrated_packages:
202            try:
203                if key[1] == "__first__":
204                    return self.graph.root_nodes(key[0])[0]
205                else:  # "__latest__"
206                    return self.graph.leaf_nodes(key[0])[0]
207            except IndexError:
208                if self.ignore_no_migrations:
209                    return None
210                else:
211                    raise ValueError(f"Dependency on app with no migrations: {key[0]}")
212        raise ValueError(f"Dependency on unknown app: {key[0]}")
213
214    def add_internal_dependencies(
215        self, key: tuple[str, str], migration: Migration
216    ) -> None:
217        """
218        Internal dependencies need to be added first to ensure `__first__`
219        dependencies find the correct root node.
220        """
221        for parent in migration.dependencies:
222            # Ignore __first__ references to the same app.
223            if parent[0] == key[0] and parent[1] != "__first__":
224                self.graph.add_dependency(migration, key, parent, skip_validation=True)
225
226    def add_external_dependencies(
227        self, key: tuple[str, str], migration: Migration
228    ) -> None:
229        for parent in migration.dependencies:
230            # Skip internal dependencies
231            if key[0] == parent[0]:
232                continue
233            parent = self.check_key(parent, key[0])
234            if parent is not None:
235                self.graph.add_dependency(migration, key, parent, skip_validation=True)
236
237    def build_graph(self) -> None:
238        """
239        Build a migration dependency graph using both the disk and database.
240        You'll need to rebuild the graph if you apply migrations. This isn't
241        usually a problem as generally migration stuff runs in a one-shot process.
242        """
243        # Load disk data
244        self.load_disk()
245        # Load database data
246        if self.connection is None:
247            self.applied_migrations = {}
248        else:
249            recorder = MigrationRecorder(self.connection)
250            self.applied_migrations = recorder.applied_migrations()
251        # To start, populate the migration graph with nodes for ALL migrations
252        # and their dependencies. Also make note of replacing migrations at this step.
253        self.graph = MigrationGraph()
254        self.replacements = {}
255        for key, migration in self.disk_migrations.items():
256            self.graph.add_node(key, migration)
257            # Replacing migrations.
258            if migration.replaces:
259                self.replacements[key] = migration
260        for key, migration in self.disk_migrations.items():
261            # Internal (same app) dependencies.
262            self.add_internal_dependencies(key, migration)
263        # Add external dependencies now that the internal ones have been resolved.
264        for key, migration in self.disk_migrations.items():
265            self.add_external_dependencies(key, migration)
266        # Carry out replacements where possible and if enabled.
267        if self.replace_migrations:
268            for key, migration in self.replacements.items():
269                # Get applied status of each of this migration's replacement
270                # targets.
271                applied_statuses = [
272                    (target in self.applied_migrations) for target in migration.replaces
273                ]
274                # The replacing migration is only marked as applied if all of
275                # its replacement targets are.
276                if all(applied_statuses):
277                    self.applied_migrations[key] = migration
278                else:
279                    self.applied_migrations.pop(key, None)
280                # A replacing migration can be used if either all or none of
281                # its replacement targets have been applied.
282                if all(applied_statuses) or (not any(applied_statuses)):
283                    self.graph.remove_replaced_nodes(key, migration.replaces)
284                else:
285                    # This replacing migration cannot be used because it is
286                    # partially applied. Remove it from the graph and remap
287                    # dependencies to it (#25945).
288                    self.graph.remove_replacement_node(key, migration.replaces)
289        # Ensure the graph is consistent.
290        try:
291            self.graph.validate_consistency()
292        except NodeNotFoundError as exc:
293            # Check if the missing node could have been replaced by any squash
294            # migration but wasn't because the squash migration was partially
295            # applied before. In that case raise a more understandable exception
296            # (#23556).
297            # Get reverse replacements.
298            reverse_replacements = {}
299            for key, migration in self.replacements.items():
300                for replaced in migration.replaces:
301                    reverse_replacements.setdefault(replaced, set()).add(key)
302            # Try to reraise exception with more detail.
303            if exc.node in reverse_replacements:
304                candidates = reverse_replacements.get(exc.node, set())
305                is_replaced = any(
306                    candidate in self.graph.nodes for candidate in candidates
307                )
308                if not is_replaced:
309                    tries = ", ".join("{}.{}".format(*c) for c in candidates)
310                    raise NodeNotFoundError(
311                        f"Migration {exc.origin} depends on nonexistent node ('{exc.node[0]}', '{exc.node[1]}'). "
312                        f"Plain tried to replace migration {exc.node[0]}.{exc.node[1]} with any of [{tries}] "
313                        "but wasn't able to because some of the replaced migrations "
314                        "are already applied.",
315                        exc.node,
316                    ) from exc
317            raise
318        self.graph.ensure_not_cyclic()
319
320    def check_consistent_history(
321        self, connection: BaseDatabaseWrapper | DatabaseConnection
322    ) -> None:
323        """
324        Raise InconsistentMigrationHistory if any applied migrations have
325        unapplied dependencies.
326        """
327        recorder = MigrationRecorder(connection)
328        applied = recorder.applied_migrations()
329        for migration in applied:
330            # If the migration is unknown, skip it.
331            if migration not in self.graph.nodes:
332                continue
333            for parent in self.graph.node_map[migration].parents:
334                if parent not in applied:
335                    # Skip unapplied squashed migrations that have all of their
336                    # `replaces` applied.
337                    if parent in self.replacements:
338                        if all(
339                            m in applied for m in self.replacements[parent].replaces
340                        ):
341                            continue
342                    raise InconsistentMigrationHistory(
343                        f"Migration {migration[0]}.{migration[1]} is applied before its dependency "
344                        f"{parent[0]}.{parent[1]} on the database."
345                    )
346
347    def detect_conflicts(self) -> dict[str, list[str]]:
348        """
349        Look through the loaded graph and detect any conflicts - packages
350        with more than one leaf migration. Return a dict of the app labels
351        that conflict with the migration names that conflict.
352        """
353        seen_packages = {}
354        conflicting_packages = set()
355        for package_label, migration_name in self.graph.leaf_nodes():
356            if package_label in seen_packages:
357                conflicting_packages.add(package_label)
358            seen_packages.setdefault(package_label, set()).add(migration_name)
359        return {
360            package_label: sorted(seen_packages[package_label])
361            for package_label in conflicting_packages
362        }
363
364    def project_state(
365        self, nodes: tuple[str, str] | None = None, at_end: bool = True
366    ) -> Any:
367        """
368        Return a ProjectState object representing the most recent state
369        that the loaded migrations represent.
370
371        See graph.make_state() for the meaning of "nodes" and "at_end".
372        """
373        return self.graph.make_state(
374            nodes=nodes, at_end=at_end, real_packages=self.unmigrated_packages
375        )