1from __future__ import annotations
2
3import pkgutil
4import sys
5from importlib import import_module, reload
6from typing import TYPE_CHECKING, Any
7
8from plain.models.connections import DatabaseConnection
9from plain.models.migrations.graph import MigrationGraph
10from plain.models.migrations.recorder import MigrationRecorder
11from plain.packages import packages_registry
12
13from .exceptions import (
14 AmbiguityError,
15 BadMigrationError,
16 InconsistentMigrationHistory,
17 NodeNotFoundError,
18)
19
20if TYPE_CHECKING:
21 from plain.models.backends.base.base import BaseDatabaseWrapper
22 from plain.models.migrations.migration import Migration
23
24MIGRATIONS_MODULE_NAME = "migrations"
25
26
27class MigrationLoader:
28 """
29 Load migration files from disk and their status from the database.
30
31 Migration files are expected to live in the "migrations" directory of
32 an app. Their names are entirely unimportant from a code perspective,
33 but will probably follow the 1234_name.py convention.
34
35 On initialization, this class will scan those directories, and open and
36 read the Python files, looking for a class called Migration, which should
37 inherit from plain.models.migrations.Migration. See
38 plain.models.migrations.migration for what that looks like.
39
40 Some migrations will be marked as "replacing" another set of migrations.
41 These are loaded into a separate set of migrations away from the main ones.
42 If all the migrations they replace are either unapplied or missing from
43 disk, then they are injected into the main set, replacing the named migrations.
44 Any dependency pointers to the replaced migrations are re-pointed to the
45 new migration.
46
47 This does mean that this class MUST also talk to the database as well as
48 to disk, but this is probably fine. We're already not just operating
49 in memory.
50 """
51
52 def __init__(
53 self,
54 connection: BaseDatabaseWrapper | DatabaseConnection | None,
55 load: bool = True,
56 ignore_no_migrations: bool = False,
57 replace_migrations: bool = True,
58 ):
59 self.connection = connection
60 self.disk_migrations: dict[tuple[str, str], Migration] | None = None
61 self.applied_migrations: dict[tuple[str, str], Any] | None = None
62 self.ignore_no_migrations = ignore_no_migrations
63 self.replace_migrations = replace_migrations
64 self.unmigrated_packages: set[str]
65 self.migrated_packages: set[str]
66 self.graph: MigrationGraph
67 self.replacements: dict[tuple[str, str], Migration]
68 if load:
69 self.build_graph()
70
71 @classmethod
72 def migrations_module(cls, package_label: str) -> tuple[str | None, bool]:
73 """
74 Return the path to the migrations module for the specified package_label
75 and a boolean indicating if the module is specified in
76 settings.MIGRATION_MODULE.
77 """
78
79 # This package (plain-models) has different code under migrations/
80 if package_label == "plainmodels":
81 return None, True
82
83 app = packages_registry.get_package_config(package_label)
84 return f"{app.name}.{MIGRATIONS_MODULE_NAME}", False
85
86 def load_disk(self) -> None:
87 """Load the migrations from all INSTALLED_PACKAGES from disk."""
88 self.disk_migrations = {}
89 self.unmigrated_packages = set()
90 self.migrated_packages = set()
91 for package_config in packages_registry.get_package_configs():
92 # Get the migrations module directory
93 module_name, explicit = self.migrations_module(package_config.package_label)
94 if module_name is None:
95 self.unmigrated_packages.add(package_config.package_label)
96 continue
97 was_loaded = module_name in sys.modules
98 try:
99 module = import_module(module_name)
100 except ModuleNotFoundError as e:
101 if (explicit and self.ignore_no_migrations) or (
102 not explicit
103 and e.name is not None
104 and MIGRATIONS_MODULE_NAME in e.name.split(".")
105 ):
106 self.unmigrated_packages.add(package_config.package_label)
107 continue
108 raise
109 else:
110 # Module is not a package (e.g. migrations.py).
111 if not hasattr(module, "__path__"):
112 self.unmigrated_packages.add(package_config.package_label)
113 continue
114 # Empty directories are namespaces. Namespace packages have no
115 # __file__ and don't use a list for __path__. See
116 # https://docs.python.org/3/reference/import.html#namespace-packages
117 if getattr(module, "__file__", None) is None and not isinstance(
118 module.__path__, list
119 ):
120 self.unmigrated_packages.add(package_config.package_label)
121 continue
122 # Force a reload if it's already loaded (tests need this)
123 if was_loaded:
124 reload(module)
125 self.migrated_packages.add(package_config.package_label)
126 migration_names = {
127 name
128 for _, name, is_pkg in pkgutil.iter_modules(module.__path__)
129 if not is_pkg and name[0] not in "_~"
130 }
131 # Load migrations
132 for migration_name in migration_names:
133 migration_path = f"{module_name}.{migration_name}"
134 try:
135 migration_module = import_module(migration_path)
136 except ImportError as e:
137 if "bad magic number" in str(e):
138 raise ImportError(
139 f"Couldn't import {migration_path!r} as it appears to be a stale "
140 ".pyc file."
141 ) from e
142 else:
143 raise
144 if not hasattr(migration_module, "Migration"):
145 raise BadMigrationError(
146 f"Migration {migration_name} in app {package_config.package_label} has no Migration class"
147 )
148 self.disk_migrations[package_config.package_label, migration_name] = (
149 migration_module.Migration(
150 migration_name,
151 package_config.package_label,
152 )
153 )
154
155 def get_migration(self, package_label: str, name_prefix: str) -> Migration | None:
156 """Return the named migration or raise NodeNotFoundError."""
157 return self.graph.nodes[package_label, name_prefix]
158
159 def get_migration_by_prefix(
160 self, package_label: str, name_prefix: str
161 ) -> Migration:
162 """
163 Return the migration(s) which match the given app label and name_prefix.
164 """
165 # Do the search
166 assert self.disk_migrations is not None, "load_disk() must be called first"
167 results = []
168 for migration_package_label, migration_name in self.disk_migrations:
169 if migration_package_label == package_label and migration_name.startswith(
170 name_prefix
171 ):
172 results.append((migration_package_label, migration_name))
173 if len(results) > 1:
174 raise AmbiguityError(
175 f"There is more than one migration for '{package_label}' with the prefix '{name_prefix}'"
176 )
177 elif not results:
178 raise KeyError(
179 f"There is no migration for '{package_label}' with the prefix "
180 f"'{name_prefix}'"
181 )
182 else:
183 return self.disk_migrations[results[0]]
184
185 def check_key(
186 self, key: tuple[str, str], current_package: str
187 ) -> tuple[str, str] | None:
188 if (key[1] != "__first__" and key[1] != "__latest__") or key in self.graph:
189 return key
190 # Special-case __first__, which means "the first migration" for
191 # migrated packages, and is ignored for unmigrated packages. It allows
192 # makemigrations to declare dependencies on packages before they even have
193 # migrations.
194 if key[0] == current_package:
195 # Ignore __first__ references to the same app (#22325)
196 return None
197 if key[0] in self.unmigrated_packages:
198 # This app isn't migrated, but something depends on it.
199 # The models will get auto-added into the state, though
200 # so we're fine.
201 return None
202 if key[0] in self.migrated_packages:
203 try:
204 if key[1] == "__first__":
205 return self.graph.root_nodes(key[0])[0]
206 else: # "__latest__"
207 return self.graph.leaf_nodes(key[0])[0]
208 except IndexError:
209 if self.ignore_no_migrations:
210 return None
211 else:
212 raise ValueError(f"Dependency on app with no migrations: {key[0]}")
213 raise ValueError(f"Dependency on unknown app: {key[0]}")
214
215 def add_internal_dependencies(
216 self, key: tuple[str, str], migration: Migration
217 ) -> None:
218 """
219 Internal dependencies need to be added first to ensure `__first__`
220 dependencies find the correct root node.
221 """
222 for parent in migration.dependencies:
223 # Ignore __first__ references to the same app.
224 if parent[0] == key[0] and parent[1] != "__first__":
225 # Migration object is used only for error messages in add_dependency
226 self.graph.add_dependency(migration, key, parent, skip_validation=True)
227
228 def add_external_dependencies(
229 self, key: tuple[str, str], migration: Migration
230 ) -> None:
231 for parent in migration.dependencies:
232 # Skip internal dependencies
233 if key[0] == parent[0]:
234 continue
235 parent = self.check_key(parent, key[0])
236 if parent is not None:
237 # Migration object is used only for error messages in add_dependency
238 self.graph.add_dependency(migration, key, parent, skip_validation=True)
239
240 def build_graph(self) -> None:
241 """
242 Build a migration dependency graph using both the disk and database.
243 You'll need to rebuild the graph if you apply migrations. This isn't
244 usually a problem as generally migration stuff runs in a one-shot process.
245 """
246 # Load disk data
247 self.load_disk()
248 assert self.disk_migrations is not None # load_disk() ensures this
249 # Load database data
250 if self.connection is None:
251 self.applied_migrations = {}
252 else:
253 recorder = MigrationRecorder(self.connection)
254 self.applied_migrations = recorder.applied_migrations()
255 # To start, populate the migration graph with nodes for ALL migrations
256 # and their dependencies. Also make note of replacing migrations at this step.
257 self.graph = MigrationGraph()
258 self.replacements = {}
259 for key, migration in self.disk_migrations.items():
260 self.graph.add_node(key, migration)
261 # Replacing migrations.
262 if migration.replaces:
263 self.replacements[key] = migration
264 for key, migration in self.disk_migrations.items():
265 # Internal (same app) dependencies.
266 self.add_internal_dependencies(key, migration)
267 # Add external dependencies now that the internal ones have been resolved.
268 for key, migration in self.disk_migrations.items():
269 self.add_external_dependencies(key, migration)
270 # Carry out replacements where possible and if enabled.
271 if self.replace_migrations:
272 for key, migration in self.replacements.items():
273 # Get applied status of each of this migration's replacement
274 # targets.
275 applied_statuses = [
276 (target in self.applied_migrations) for target in migration.replaces
277 ]
278 # The replacing migration is only marked as applied if all of
279 # its replacement targets are.
280 if all(applied_statuses):
281 self.applied_migrations[key] = migration
282 else:
283 self.applied_migrations.pop(key, None)
284 # A replacing migration can be used if either all or none of
285 # its replacement targets have been applied.
286 if all(applied_statuses) or (not any(applied_statuses)):
287 self.graph.remove_replaced_nodes(key, migration.replaces)
288 else:
289 # This replacing migration cannot be used because it is
290 # partially applied. Remove it from the graph and remap
291 # dependencies to it (#25945).
292 self.graph.remove_replacement_node(key, migration.replaces)
293 # Ensure the graph is consistent.
294 try:
295 self.graph.validate_consistency()
296 except NodeNotFoundError as exc:
297 # Check if the missing node could have been replaced by any squash
298 # migration but wasn't because the squash migration was partially
299 # applied before. In that case raise a more understandable exception
300 # (#23556).
301 # Get reverse replacements.
302 reverse_replacements = {}
303 for key, migration in self.replacements.items():
304 for replaced in migration.replaces:
305 reverse_replacements.setdefault(replaced, set()).add(key)
306 # Try to reraise exception with more detail.
307 if exc.node in reverse_replacements:
308 candidates = reverse_replacements.get(exc.node, set())
309 is_replaced = any(
310 candidate in self.graph.nodes for candidate in candidates
311 )
312 if not is_replaced:
313 tries = ", ".join("{}.{}".format(*c) for c in candidates)
314 raise NodeNotFoundError(
315 f"Migration {exc.origin} depends on nonexistent node ('{exc.node[0]}', '{exc.node[1]}'). "
316 f"Plain tried to replace migration {exc.node[0]}.{exc.node[1]} with any of [{tries}] "
317 "but wasn't able to because some of the replaced migrations "
318 "are already applied.",
319 exc.node,
320 ) from exc
321 raise
322 self.graph.ensure_not_cyclic()
323
324 def check_consistent_history(
325 self, connection: BaseDatabaseWrapper | DatabaseConnection
326 ) -> None:
327 """
328 Raise InconsistentMigrationHistory if any applied migrations have
329 unapplied dependencies.
330 """
331 recorder = MigrationRecorder(connection)
332 applied = recorder.applied_migrations()
333 for migration in applied:
334 # If the migration is unknown, skip it.
335 if migration not in self.graph.nodes:
336 continue
337 for parent in self.graph.node_map[migration].parents:
338 if parent not in applied:
339 # Skip unapplied squashed migrations that have all of their
340 # `replaces` applied.
341 # Use parent.key for dict lookup (Node.__eq__ allows `in` check)
342 if parent.key in self.replacements:
343 if all(
344 m in applied for m in self.replacements[parent.key].replaces
345 ):
346 continue
347 raise InconsistentMigrationHistory(
348 f"Migration {migration[0]}.{migration[1]} is applied before its dependency "
349 f"{parent[0]}.{parent[1]} on the database."
350 )
351
352 def detect_conflicts(self) -> dict[str, list[str]]:
353 """
354 Look through the loaded graph and detect any conflicts - packages
355 with more than one leaf migration. Return a dict of the app labels
356 that conflict with the migration names that conflict.
357 """
358 seen_packages = {}
359 conflicting_packages = set()
360 for package_label, migration_name in self.graph.leaf_nodes():
361 if package_label in seen_packages:
362 conflicting_packages.add(package_label)
363 seen_packages.setdefault(package_label, set()).add(migration_name)
364 return {
365 package_label: sorted(seen_packages[package_label])
366 for package_label in conflicting_packages
367 }
368
369 def project_state(
370 self, nodes: tuple[str, str] | None = None, at_end: bool = True
371 ) -> Any:
372 """
373 Return a ProjectState object representing the most recent state
374 that the loaded migrations represent.
375
376 See graph.make_state() for the meaning of "nodes" and "at_end".
377 """
378 return self.graph.make_state(
379 nodes=nodes, at_end=at_end, real_packages=self.unmigrated_packages
380 )