1from __future__ import annotations
2
3import pkgutil
4import sys
5from importlib import import_module, reload
6from typing import TYPE_CHECKING, Any
7
8from plain.models.connections import DatabaseConnection
9from plain.models.migrations.graph import MigrationGraph
10from plain.models.migrations.recorder import MigrationRecorder
11from plain.packages import packages_registry
12
13from .exceptions import (
14 AmbiguityError,
15 BadMigrationError,
16 InconsistentMigrationHistory,
17 NodeNotFoundError,
18)
19
20if TYPE_CHECKING:
21 from plain.models.backends.base.base import BaseDatabaseWrapper
22 from plain.models.migrations.migration import Migration
23
24MIGRATIONS_MODULE_NAME = "migrations"
25
26
27class MigrationLoader:
28 """
29 Load migration files from disk and their status from the database.
30
31 Migration files are expected to live in the "migrations" directory of
32 an app. Their names are entirely unimportant from a code perspective,
33 but will probably follow the 1234_name.py convention.
34
35 On initialization, this class will scan those directories, and open and
36 read the Python files, looking for a class called Migration, which should
37 inherit from plain.models.migrations.Migration. See
38 plain.models.migrations.migration for what that looks like.
39
40 Some migrations will be marked as "replacing" another set of migrations.
41 These are loaded into a separate set of migrations away from the main ones.
42 If all the migrations they replace are either unapplied or missing from
43 disk, then they are injected into the main set, replacing the named migrations.
44 Any dependency pointers to the replaced migrations are re-pointed to the
45 new migration.
46
47 This does mean that this class MUST also talk to the database as well as
48 to disk, but this is probably fine. We're already not just operating
49 in memory.
50 """
51
52 def __init__(
53 self,
54 connection: BaseDatabaseWrapper | DatabaseConnection | None,
55 load: bool = True,
56 ignore_no_migrations: bool = False,
57 replace_migrations: bool = True,
58 ):
59 self.connection = connection
60 self.disk_migrations: dict[tuple[str, str], Migration] | None = None
61 self.applied_migrations: dict[tuple[str, str], Any] | None = None
62 self.ignore_no_migrations = ignore_no_migrations
63 self.replace_migrations = replace_migrations
64 self.unmigrated_packages: set[str]
65 self.migrated_packages: set[str]
66 self.graph: MigrationGraph
67 self.replacements: dict[tuple[str, str], Migration]
68 if load:
69 self.build_graph()
70
71 @classmethod
72 def migrations_module(cls, package_label: str) -> tuple[str | None, bool]:
73 """
74 Return the path to the migrations module for the specified package_label
75 and a boolean indicating if the module is specified in
76 settings.MIGRATION_MODULE.
77 """
78
79 # This package (plain-models) has different code under migrations/
80 if package_label == "plainmodels":
81 return None, True
82
83 app = packages_registry.get_package_config(package_label)
84 return f"{app.name}.{MIGRATIONS_MODULE_NAME}", False
85
86 def load_disk(self) -> None:
87 """Load the migrations from all INSTALLED_PACKAGES from disk."""
88 self.disk_migrations = {}
89 self.unmigrated_packages = set()
90 self.migrated_packages = set()
91 for package_config in packages_registry.get_package_configs():
92 # Get the migrations module directory
93 module_name, explicit = self.migrations_module(package_config.package_label)
94 if module_name is None:
95 self.unmigrated_packages.add(package_config.package_label)
96 continue
97 was_loaded = module_name in sys.modules
98 try:
99 module = import_module(module_name)
100 except ModuleNotFoundError as e:
101 if (explicit and self.ignore_no_migrations) or (
102 not explicit
103 and e.name is not None
104 and MIGRATIONS_MODULE_NAME in e.name.split(".")
105 ):
106 self.unmigrated_packages.add(package_config.package_label)
107 continue
108 raise
109 else:
110 # Module is not a package (e.g. migrations.py).
111 if not hasattr(module, "__path__"):
112 self.unmigrated_packages.add(package_config.package_label)
113 continue
114 # Empty directories are namespaces. Namespace packages have no
115 # __file__ and don't use a list for __path__. See
116 # https://docs.python.org/3/reference/import.html#namespace-packages
117 if getattr(module, "__file__", None) is None and not isinstance(
118 module.__path__, list
119 ):
120 self.unmigrated_packages.add(package_config.package_label)
121 continue
122 # Force a reload if it's already loaded (tests need this)
123 if was_loaded:
124 reload(module)
125 self.migrated_packages.add(package_config.package_label)
126 migration_names = {
127 name
128 for _, name, is_pkg in pkgutil.iter_modules(module.__path__)
129 if not is_pkg and name[0] not in "_~"
130 }
131 # Load migrations
132 for migration_name in migration_names:
133 migration_path = f"{module_name}.{migration_name}"
134 try:
135 migration_module = import_module(migration_path)
136 except ImportError as e:
137 if "bad magic number" in str(e):
138 raise ImportError(
139 f"Couldn't import {migration_path!r} as it appears to be a stale "
140 ".pyc file."
141 ) from e
142 else:
143 raise
144 if not hasattr(migration_module, "Migration"):
145 raise BadMigrationError(
146 f"Migration {migration_name} in app {package_config.package_label} has no Migration class"
147 )
148 self.disk_migrations[package_config.package_label, migration_name] = (
149 migration_module.Migration( # type: ignore[call-non-callable]
150 migration_name,
151 package_config.package_label,
152 )
153 )
154
155 def get_migration(self, package_label: str, name_prefix: str) -> Migration | None:
156 """Return the named migration or raise NodeNotFoundError."""
157 return self.graph.nodes[package_label, name_prefix]
158
159 def get_migration_by_prefix(
160 self, package_label: str, name_prefix: str
161 ) -> Migration:
162 """
163 Return the migration(s) which match the given app label and name_prefix.
164 """
165 # Do the search
166 results = []
167 for migration_package_label, migration_name in self.disk_migrations:
168 if migration_package_label == package_label and migration_name.startswith(
169 name_prefix
170 ):
171 results.append((migration_package_label, migration_name))
172 if len(results) > 1:
173 raise AmbiguityError(
174 f"There is more than one migration for '{package_label}' with the prefix '{name_prefix}'"
175 )
176 elif not results:
177 raise KeyError(
178 f"There is no migration for '{package_label}' with the prefix "
179 f"'{name_prefix}'"
180 )
181 else:
182 return self.disk_migrations[results[0]]
183
184 def check_key(
185 self, key: tuple[str, str], current_package: str
186 ) -> tuple[str, str] | None:
187 if (key[1] != "__first__" and key[1] != "__latest__") or key in self.graph:
188 return key
189 # Special-case __first__, which means "the first migration" for
190 # migrated packages, and is ignored for unmigrated packages. It allows
191 # makemigrations to declare dependencies on packages before they even have
192 # migrations.
193 if key[0] == current_package:
194 # Ignore __first__ references to the same app (#22325)
195 return None
196 if key[0] in self.unmigrated_packages:
197 # This app isn't migrated, but something depends on it.
198 # The models will get auto-added into the state, though
199 # so we're fine.
200 return None
201 if key[0] in self.migrated_packages:
202 try:
203 if key[1] == "__first__":
204 return self.graph.root_nodes(key[0])[0]
205 else: # "__latest__"
206 return self.graph.leaf_nodes(key[0])[0]
207 except IndexError:
208 if self.ignore_no_migrations:
209 return None
210 else:
211 raise ValueError(f"Dependency on app with no migrations: {key[0]}")
212 raise ValueError(f"Dependency on unknown app: {key[0]}")
213
214 def add_internal_dependencies(
215 self, key: tuple[str, str], migration: Migration
216 ) -> None:
217 """
218 Internal dependencies need to be added first to ensure `__first__`
219 dependencies find the correct root node.
220 """
221 for parent in migration.dependencies:
222 # Ignore __first__ references to the same app.
223 if parent[0] == key[0] and parent[1] != "__first__":
224 self.graph.add_dependency(migration, key, parent, skip_validation=True)
225
226 def add_external_dependencies(
227 self, key: tuple[str, str], migration: Migration
228 ) -> None:
229 for parent in migration.dependencies:
230 # Skip internal dependencies
231 if key[0] == parent[0]:
232 continue
233 parent = self.check_key(parent, key[0])
234 if parent is not None:
235 self.graph.add_dependency(migration, key, parent, skip_validation=True)
236
237 def build_graph(self) -> None:
238 """
239 Build a migration dependency graph using both the disk and database.
240 You'll need to rebuild the graph if you apply migrations. This isn't
241 usually a problem as generally migration stuff runs in a one-shot process.
242 """
243 # Load disk data
244 self.load_disk()
245 # Load database data
246 if self.connection is None:
247 self.applied_migrations = {}
248 else:
249 recorder = MigrationRecorder(self.connection)
250 self.applied_migrations = recorder.applied_migrations()
251 # To start, populate the migration graph with nodes for ALL migrations
252 # and their dependencies. Also make note of replacing migrations at this step.
253 self.graph = MigrationGraph()
254 self.replacements = {}
255 for key, migration in self.disk_migrations.items():
256 self.graph.add_node(key, migration)
257 # Replacing migrations.
258 if migration.replaces:
259 self.replacements[key] = migration
260 for key, migration in self.disk_migrations.items():
261 # Internal (same app) dependencies.
262 self.add_internal_dependencies(key, migration)
263 # Add external dependencies now that the internal ones have been resolved.
264 for key, migration in self.disk_migrations.items():
265 self.add_external_dependencies(key, migration)
266 # Carry out replacements where possible and if enabled.
267 if self.replace_migrations:
268 for key, migration in self.replacements.items():
269 # Get applied status of each of this migration's replacement
270 # targets.
271 applied_statuses = [
272 (target in self.applied_migrations) for target in migration.replaces
273 ]
274 # The replacing migration is only marked as applied if all of
275 # its replacement targets are.
276 if all(applied_statuses):
277 self.applied_migrations[key] = migration
278 else:
279 self.applied_migrations.pop(key, None)
280 # A replacing migration can be used if either all or none of
281 # its replacement targets have been applied.
282 if all(applied_statuses) or (not any(applied_statuses)):
283 self.graph.remove_replaced_nodes(key, migration.replaces)
284 else:
285 # This replacing migration cannot be used because it is
286 # partially applied. Remove it from the graph and remap
287 # dependencies to it (#25945).
288 self.graph.remove_replacement_node(key, migration.replaces)
289 # Ensure the graph is consistent.
290 try:
291 self.graph.validate_consistency()
292 except NodeNotFoundError as exc:
293 # Check if the missing node could have been replaced by any squash
294 # migration but wasn't because the squash migration was partially
295 # applied before. In that case raise a more understandable exception
296 # (#23556).
297 # Get reverse replacements.
298 reverse_replacements = {}
299 for key, migration in self.replacements.items():
300 for replaced in migration.replaces:
301 reverse_replacements.setdefault(replaced, set()).add(key)
302 # Try to reraise exception with more detail.
303 if exc.node in reverse_replacements:
304 candidates = reverse_replacements.get(exc.node, set())
305 is_replaced = any(
306 candidate in self.graph.nodes for candidate in candidates
307 )
308 if not is_replaced:
309 tries = ", ".join("{}.{}".format(*c) for c in candidates)
310 raise NodeNotFoundError(
311 f"Migration {exc.origin} depends on nonexistent node ('{exc.node[0]}', '{exc.node[1]}'). "
312 f"Plain tried to replace migration {exc.node[0]}.{exc.node[1]} with any of [{tries}] "
313 "but wasn't able to because some of the replaced migrations "
314 "are already applied.",
315 exc.node,
316 ) from exc
317 raise
318 self.graph.ensure_not_cyclic()
319
320 def check_consistent_history(
321 self, connection: BaseDatabaseWrapper | DatabaseConnection
322 ) -> None:
323 """
324 Raise InconsistentMigrationHistory if any applied migrations have
325 unapplied dependencies.
326 """
327 recorder = MigrationRecorder(connection)
328 applied = recorder.applied_migrations()
329 for migration in applied:
330 # If the migration is unknown, skip it.
331 if migration not in self.graph.nodes:
332 continue
333 for parent in self.graph.node_map[migration].parents:
334 if parent not in applied:
335 # Skip unapplied squashed migrations that have all of their
336 # `replaces` applied.
337 if parent in self.replacements:
338 if all(
339 m in applied for m in self.replacements[parent].replaces
340 ):
341 continue
342 raise InconsistentMigrationHistory(
343 f"Migration {migration[0]}.{migration[1]} is applied before its dependency "
344 f"{parent[0]}.{parent[1]} on the database."
345 )
346
347 def detect_conflicts(self) -> dict[str, list[str]]:
348 """
349 Look through the loaded graph and detect any conflicts - packages
350 with more than one leaf migration. Return a dict of the app labels
351 that conflict with the migration names that conflict.
352 """
353 seen_packages = {}
354 conflicting_packages = set()
355 for package_label, migration_name in self.graph.leaf_nodes():
356 if package_label in seen_packages:
357 conflicting_packages.add(package_label)
358 seen_packages.setdefault(package_label, set()).add(migration_name)
359 return {
360 package_label: sorted(seen_packages[package_label])
361 for package_label in conflicting_packages
362 }
363
364 def project_state(
365 self, nodes: tuple[str, str] | None = None, at_end: bool = True
366 ) -> Any:
367 """
368 Return a ProjectState object representing the most recent state
369 that the loaded migrations represent.
370
371 See graph.make_state() for the meaning of "nodes" and "at_end".
372 """
373 return self.graph.make_state(
374 nodes=nodes, at_end=at_end, real_packages=self.unmigrated_packages
375 )