1from __future__ import annotations
2
3import functools
4import re
5from graphlib import TopologicalSorter
6from typing import TYPE_CHECKING, Any
7
8from plain.postgres.fields import (
9 NOT_PROVIDED,
10 DateField,
11 DateTimeField,
12 Field,
13 TimeField,
14)
15from plain.postgres.fields.related import ManyToManyField, RelatedField
16from plain.postgres.fields.reverse_related import ManyToManyRel
17from plain.postgres.migrations import operations
18from plain.postgres.migrations.migration import Migration, SettingsTuple
19from plain.postgres.migrations.operations.models import AlterModelOptions
20from plain.postgres.migrations.optimizer import MigrationOptimizer
21from plain.postgres.migrations.questioner import MigrationQuestioner
22from plain.postgres.migrations.utils import (
23 COMPILED_REGEX_TYPE,
24 RegexObject,
25 resolve_relation,
26)
27from plain.runtime import settings
28
29if TYPE_CHECKING:
30 from plain.postgres.migrations.graph import MigrationGraph
31 from plain.postgres.migrations.operations.base import Operation
32 from plain.postgres.migrations.state import ProjectState
33
34
35class MigrationAutodetector:
36 """
37 Take a pair of ProjectStates and compare them to see what the first would
38 need doing to make it match the second (the second usually being the
39 project's current state).
40
41 Note that this naturally operates on entire projects at a time,
42 as it's likely that changes interact (for example, you can't
43 add a ForeignKeyField without having a migration to add the table it
44 depends on first). A user interface may offer single-app usage
45 if it wishes, with the caveat that it may not always be possible.
46 """
47
48 def __init__(
49 self,
50 from_state: ProjectState,
51 to_state: ProjectState,
52 questioner: MigrationQuestioner | None = None,
53 ):
54 self.from_state = from_state
55 self.to_state = to_state
56 self.questioner = questioner or MigrationQuestioner()
57 self.existing_packages = {app for app, model in from_state.models}
58
59 def changes(
60 self,
61 graph: MigrationGraph,
62 trim_to_packages: set[str] | None = None,
63 convert_packages: set[str] | None = None,
64 migration_name: str | None = None,
65 ) -> dict[str, list[Migration]]:
66 """
67 Main entry point to produce a list of applicable changes.
68 Take a graph to base names on and an optional set of packages
69 to try and restrict to (restriction is not guaranteed)
70 """
71 changes = self._detect_changes(convert_packages, graph)
72 changes = self.arrange_for_graph(changes, graph, migration_name)
73 if trim_to_packages:
74 changes = self._trim_to_packages(changes, trim_to_packages)
75 return changes
76
77 def deep_deconstruct(self, obj: Any) -> Any:
78 """
79 Recursive deconstruction for a field and its arguments.
80 Used for full comparison for rename/alter; sometimes a single-level
81 deconstruction will not compare correctly.
82 """
83 if isinstance(obj, list):
84 return [self.deep_deconstruct(value) for value in obj]
85 elif isinstance(obj, tuple):
86 return tuple(self.deep_deconstruct(value) for value in obj)
87 elif isinstance(obj, dict):
88 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
89 elif isinstance(obj, functools.partial):
90 return (
91 obj.func,
92 self.deep_deconstruct(obj.args),
93 self.deep_deconstruct(obj.keywords),
94 )
95 elif isinstance(obj, COMPILED_REGEX_TYPE):
96 return RegexObject(obj)
97 elif isinstance(obj, type):
98 # If this is a type that implements 'deconstruct' as an instance method,
99 # avoid treating this as being deconstructible itself - see #22951
100 return obj
101 elif hasattr(obj, "deconstruct"):
102 deconstructed = obj.deconstruct()
103 if isinstance(obj, Field):
104 # we have a field which also returns a name
105 deconstructed = deconstructed[1:]
106 path, args, kwargs = deconstructed
107 return (
108 path,
109 [self.deep_deconstruct(value) for value in args],
110 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
111 )
112 else:
113 return obj
114
115 def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
116 """
117 Return a definition of the fields that ignores field names and
118 what related fields actually relate to. Used for detecting renames (as
119 the related fields change during renames).
120 """
121 fields_def = []
122 for name, field in sorted(fields.items()):
123 deconstruction = self.deep_deconstruct(field)
124 if isinstance(field, RelatedField) and field.remote_field.model:
125 deconstruction[2].pop("to", None)
126 fields_def.append(deconstruction)
127 return fields_def
128
129 def _detect_changes(
130 self,
131 convert_packages: set[str] | None = None,
132 graph: MigrationGraph | None = None,
133 ) -> dict[str, list[Migration]]:
134 """
135 Return a dict of migration plans which will achieve the
136 change from from_state to to_state. The dict has app labels
137 as keys and a list of migrations as values.
138
139 The resulting migrations aren't specially named, but the names
140 do matter for dependencies inside the set.
141
142 convert_packages is the list of packages to convert to use migrations
143 (i.e. to make initial migrations for, in the usual case)
144
145 graph is an optional argument that, if provided, can help improve
146 dependency generation and avoid potential circular dependencies.
147 """
148 # The first phase is generating all the operations for each app
149 # and gathering them into a big per-app list.
150 # Then go through that list, order it, and split into migrations to
151 # resolve dependencies caused by M2Ms and FKs.
152 self.generated_operations = {}
153 self.altered_indexes = {}
154 self.altered_constraints = {}
155 self.renamed_fields = {}
156
157 # Prepare some old/new state and model lists, ignoring unmigrated packages.
158 self.old_model_keys = set()
159 self.new_model_keys = set()
160 for (package_label, model_name), model_state in self.from_state.models.items():
161 if package_label not in self.from_state.real_packages:
162 self.old_model_keys.add((package_label, model_name))
163
164 for (package_label, model_name), model_state in self.to_state.models.items():
165 if package_label not in self.from_state.real_packages or (
166 convert_packages and package_label in convert_packages
167 ):
168 self.new_model_keys.add((package_label, model_name))
169
170 self.from_state.resolve_fields_and_relations()
171 self.to_state.resolve_fields_and_relations()
172
173 # Renames have to come first
174 self.generate_renamed_models()
175
176 # Prepare lists of fields and generate through model map
177 self._prepare_field_lists()
178 self._generate_through_model_map()
179
180 # Generate non-rename model operations
181 self.generate_deleted_models()
182 self.generate_created_models()
183 self.generate_altered_options()
184
185 # Create the renamed fields and store them in self.renamed_fields.
186 # They are used by create_altered_indexes(), generate_altered_fields(),
187 # generate_removed_altered_index(), and
188 # generate_altered_index().
189 self.create_renamed_fields()
190 # Create the altered indexes and store them in self.altered_indexes.
191 # This avoids the same computation in generate_removed_indexes()
192 # and generate_added_indexes().
193 self.create_altered_indexes()
194 self.create_altered_constraints()
195 # Generate index removal operations before field is removed
196 self.generate_removed_constraints()
197 self.generate_removed_indexes()
198 # Generate field renaming operations.
199 self.generate_renamed_fields()
200 self.generate_renamed_indexes()
201 # Generate field operations.
202 self.generate_removed_fields()
203 self.generate_added_fields()
204 self.generate_altered_fields()
205 self.generate_added_indexes()
206 self.generate_added_constraints()
207 self.generate_altered_db_table()
208
209 self._sort_migrations()
210 self._build_migration_list(graph)
211 self._optimize_migrations()
212
213 return self.migrations
214
215 def _prepare_field_lists(self) -> None:
216 """
217 Prepare field lists and a list of the fields that used through models
218 in the old state so dependencies can be made from the through model
219 deletion to the field that uses it.
220 """
221 self.kept_model_keys = self.old_model_keys & self.new_model_keys
222 self.through_users = {}
223 self.old_field_keys = {
224 (package_label, model_name, field_name)
225 for package_label, model_name in self.kept_model_keys
226 for field_name in self.from_state.models[
227 package_label,
228 self.renamed_models.get((package_label, model_name), model_name),
229 ].fields
230 }
231 self.new_field_keys = {
232 (package_label, model_name, field_name)
233 for package_label, model_name in self.kept_model_keys
234 for field_name in self.to_state.models[package_label, model_name].fields
235 }
236
237 def _generate_through_model_map(self) -> None:
238 """Through model map generation."""
239 for package_label, model_name in sorted(self.old_model_keys):
240 old_model_name = self.renamed_models.get(
241 (package_label, model_name), model_name
242 )
243 old_model_state = self.from_state.models[package_label, old_model_name]
244 for field_name, field in old_model_state.fields.items():
245 if hasattr(field, "remote_field") and getattr(
246 field.remote_field, "through", None
247 ):
248 through_key = resolve_relation(
249 field.remote_field.through, # type: ignore[unresolved-attribute]
250 package_label,
251 model_name,
252 )
253 self.through_users[through_key] = (
254 package_label,
255 old_model_name,
256 field_name,
257 )
258
259 @staticmethod
260 def _resolve_dependency(
261 dependency: tuple[str, str, str | None, bool | str],
262 ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
263 """
264 Return the resolved dependency and a boolean denoting whether or not
265 it was a settings dependency.
266 """
267 if not isinstance(dependency, SettingsTuple):
268 return dependency, False
269 resolved_package_label, resolved_object_name = getattr(
270 settings, dependency[1]
271 ).split(".")
272 return (resolved_package_label, resolved_object_name.lower()) + dependency[
273 2:
274 ], True
275
276 def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
277 """
278 Chop the lists of operations up into migrations with dependencies on
279 each other. Do this by going through an app's list of operations until
280 one is found that has an outgoing dependency that isn't in another
281 app's migration yet (hasn't been chopped off its list). Then chop off
282 the operations before it into a migration and move onto the next app.
283 If the loops completes without doing anything, there's a circular
284 dependency (which _should_ be impossible as the operations are
285 all split at this point so they can't depend and be depended on).
286 """
287 self.migrations = {}
288 num_ops = sum(len(x) for x in self.generated_operations.values())
289 chop_mode = False
290 while num_ops:
291 # On every iteration, we step through all the packages and see if there
292 # is a completed set of operations.
293 # If we find that a subset of the operations are complete we can
294 # try to chop it off from the rest and continue, but we only
295 # do this if we've already been through the list once before
296 # without any chopping and nothing has changed.
297 for package_label in sorted(self.generated_operations):
298 chopped = []
299 dependencies = set()
300 for operation in list(self.generated_operations[package_label]):
301 deps_satisfied = True
302 operation_dependencies = set()
303 for dep in operation._auto_deps:
304 # Temporarily resolve the settings dependency to
305 # prevent circular references. While keeping the
306 # dependency checks on the resolved model, add the
307 # settings dependencies.
308 original_dep = dep
309 dep, is_settings_dep = self._resolve_dependency(dep)
310 if dep[0] != package_label:
311 # External app dependency. See if it's not yet
312 # satisfied.
313 for other_operation in self.generated_operations.get(
314 dep[0], []
315 ):
316 if self.check_dependency(other_operation, dep):
317 deps_satisfied = False
318 break
319 if not deps_satisfied:
320 break
321 else:
322 if is_settings_dep:
323 operation_dependencies.add(
324 (original_dep[0], original_dep[1])
325 )
326 elif dep[0] in self.migrations:
327 operation_dependencies.add(
328 (dep[0], self.migrations[dep[0]][-1].name)
329 )
330 else:
331 # If we can't find the other app, we add a
332 # first/last dependency, but only if we've
333 # already been through once and checked
334 # everything.
335 if chop_mode:
336 # If the app already exists, we add a
337 # dependency on the last migration, as
338 # we don't know which migration
339 # contains the target field. If it's
340 # not yet migrated or has no
341 # migrations, we use __first__.
342 if graph and graph.leaf_nodes(dep[0]):
343 operation_dependencies.add(
344 graph.leaf_nodes(dep[0])[0]
345 )
346 else:
347 operation_dependencies.add(
348 (dep[0], "__first__")
349 )
350 else:
351 deps_satisfied = False
352 if deps_satisfied:
353 chopped.append(operation)
354 dependencies.update(operation_dependencies)
355 del self.generated_operations[package_label][0]
356 else:
357 break
358 # Make a migration! Well, only if there's stuff to put in it
359 if dependencies or chopped:
360 if not self.generated_operations[package_label] or chop_mode:
361 subclass = type(
362 "Migration",
363 (Migration,),
364 {"operations": [], "dependencies": []},
365 )
366 instance = subclass(
367 "auto_%i" # noqa: UP031
368 % (len(self.migrations.get(package_label, [])) + 1),
369 package_label,
370 )
371 instance.dependencies = list(dependencies)
372 instance.operations = chopped
373 instance.initial = package_label not in self.existing_packages
374 self.migrations.setdefault(package_label, []).append(instance)
375 chop_mode = False
376 else:
377 self.generated_operations[package_label] = (
378 chopped + self.generated_operations[package_label]
379 )
380 new_num_ops = sum(len(x) for x in self.generated_operations.values())
381 if new_num_ops == num_ops:
382 if not chop_mode:
383 chop_mode = True
384 else:
385 raise ValueError(
386 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
387 )
388 num_ops = new_num_ops
389
390 def _sort_migrations(self) -> None:
391 """
392 Reorder to make things possible. Reordering may be needed so FKs work
393 nicely inside the same app.
394 """
395 for package_label, ops in sorted(self.generated_operations.items()):
396 ts = TopologicalSorter()
397 for op in ops:
398 ts.add(op)
399 for dep in op._auto_deps:
400 # Resolve intra-app dependencies to handle circular
401 # references involving a settings model.
402 dep = self._resolve_dependency(dep)[0]
403 if dep[0] != package_label:
404 continue
405 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
406 self.generated_operations[package_label] = list(ts.static_order())
407
408 def _optimize_migrations(self) -> None:
409 # Add in internal dependencies among the migrations
410 for package_label, migrations in self.migrations.items():
411 for m1, m2 in zip(migrations, migrations[1:]):
412 m2.dependencies.append((package_label, m1.name))
413
414 # De-dupe dependencies
415 for migrations in self.migrations.values():
416 for migration in migrations:
417 migration.dependencies = list(set(migration.dependencies))
418
419 # Optimize migrations
420 for package_label, migrations in self.migrations.items():
421 for migration in migrations:
422 migration.operations = MigrationOptimizer().optimize(
423 migration.operations, package_label
424 )
425
426 def check_dependency(
427 self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
428 ) -> bool:
429 """
430 Return True if the given operation depends on the given dependency,
431 False otherwise.
432 """
433 # Created model
434 if dependency[2] is None and dependency[3] is True:
435 return (
436 isinstance(operation, operations.CreateModel)
437 and operation.name_lower == dependency[1].lower()
438 )
439 # Created field
440 elif dependency[2] is not None and dependency[3] is True:
441 return (
442 isinstance(operation, operations.CreateModel)
443 and operation.name_lower == dependency[1].lower()
444 and any(dependency[2] == x for x, y in operation.fields)
445 ) or (
446 isinstance(operation, operations.AddField)
447 and operation.model_name_lower == dependency[1].lower()
448 and operation.name_lower == dependency[2].lower()
449 )
450 # Removed field
451 elif dependency[2] is not None and dependency[3] is False:
452 return (
453 isinstance(operation, operations.RemoveField)
454 and operation.model_name_lower == dependency[1].lower()
455 and operation.name_lower == dependency[2].lower()
456 )
457 # Removed model
458 elif dependency[2] is None and dependency[3] is False:
459 return (
460 isinstance(operation, operations.DeleteModel)
461 and operation.name_lower == dependency[1].lower()
462 )
463 # Field being altered
464 elif dependency[2] is not None and dependency[3] == "alter":
465 return (
466 isinstance(operation, operations.AlterField)
467 and operation.model_name_lower == dependency[1].lower()
468 and operation.name_lower == dependency[2].lower()
469 )
470 # Unknown dependency. Raise an error.
471 else:
472 raise ValueError(f"Can't handle dependency {dependency!r}")
473
474 def add_operation(
475 self,
476 package_label: str,
477 operation: Operation,
478 dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
479 beginning: bool = False,
480 ) -> None:
481 # Operation dependencies are 4-element tuples:
482 # (package_label, model_name, field_name, create/delete as True/False or "alter")
483 operation._auto_deps = dependencies or []
484 if beginning:
485 self.generated_operations.setdefault(package_label, []).insert(0, operation)
486 else:
487 self.generated_operations.setdefault(package_label, []).append(operation)
488
489 def generate_renamed_models(self) -> None:
490 """
491 Find any renamed models, generate the operations for them, and remove
492 the old entry from the model lists. Must be run before other
493 model-level generation.
494 """
495 self.renamed_models = {}
496 self.renamed_models_rel = {}
497 added_models = self.new_model_keys - self.old_model_keys
498 for package_label, model_name in sorted(added_models):
499 model_state = self.to_state.models[package_label, model_name]
500 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
501
502 removed_models = self.old_model_keys - self.new_model_keys
503 for rem_package_label, rem_model_name in removed_models:
504 if rem_package_label == package_label:
505 rem_model_state = self.from_state.models[
506 rem_package_label, rem_model_name
507 ]
508 rem_model_fields_def = self.only_relation_agnostic_fields(
509 rem_model_state.fields
510 )
511 if model_fields_def == rem_model_fields_def:
512 if self.questioner.ask_rename_model(
513 rem_model_state, model_state
514 ):
515 dependencies = []
516 fields = list(model_state.fields.values()) + [
517 field.remote_field
518 for relations in self.to_state.relations[
519 package_label, model_name
520 ].values()
521 for field in relations.values()
522 if isinstance(field, RelatedField)
523 ]
524 for field in fields:
525 if isinstance(field, RelatedField):
526 dependencies.extend(
527 self._get_dependencies_for_foreign_key(
528 package_label,
529 model_name,
530 field,
531 self.to_state,
532 )
533 )
534 self.add_operation(
535 package_label,
536 operations.RenameModel(
537 old_name=rem_model_state.name,
538 new_name=model_state.name,
539 ),
540 dependencies=dependencies,
541 )
542 self.renamed_models[package_label, model_name] = (
543 rem_model_name
544 )
545 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
546 self.renamed_models_rel[renamed_models_rel_key] = (
547 f"{model_state.package_label}.{model_state.name_lower}"
548 )
549 self.old_model_keys.remove(
550 (rem_package_label, rem_model_name)
551 )
552 self.old_model_keys.add((package_label, model_name))
553 break
554
555 def generate_created_models(self) -> None:
556 """
557 Find all new models and make create
558 operations for them as well as separate operations to create any
559 foreign key or M2M relationships (these are optimized later, if
560 possible).
561
562 Defer any model options that refer to collections of fields that might
563 be deferred.
564 """
565 added_models = self.new_model_keys - self.old_model_keys
566
567 for package_label, model_name in added_models:
568 model_state = self.to_state.models[package_label, model_name]
569 # Gather related fields
570 related_fields = {}
571 primary_key_rel = None
572 for field_name, field in model_state.fields.items():
573 if isinstance(field, RelatedField):
574 if field.remote_field.model:
575 if field.primary_key:
576 primary_key_rel = field.remote_field.model
577 else:
578 related_fields[field_name] = field
579 if isinstance(field.remote_field, ManyToManyRel):
580 related_fields[field_name] = field
581
582 # Are there indexes to defer?
583 indexes = model_state.options.pop("indexes")
584 constraints = model_state.options.pop("constraints")
585 # Depend on the deletion of any possible proxy version of us
586 dependencies: list[tuple[str, str, str | None, bool | str]] = [
587 (package_label, model_name, None, False),
588 ]
589 # Depend on all bases
590 for base in model_state.bases:
591 if isinstance(base, str) and "." in base:
592 base_package_label, base_name = base.split(".", 1)
593 dependencies.append((base_package_label, base_name, None, True))
594 # Depend on the removal of base fields if the new model has
595 # a field with the same name.
596 old_base_model_state = self.from_state.models.get(
597 (base_package_label, base_name)
598 )
599 new_base_model_state = self.to_state.models.get(
600 (base_package_label, base_name)
601 )
602 if old_base_model_state and new_base_model_state:
603 removed_base_fields = (
604 set(old_base_model_state.fields)
605 .difference(
606 new_base_model_state.fields,
607 )
608 .intersection(model_state.fields)
609 )
610 for removed_base_field in removed_base_fields:
611 dependencies.append(
612 (
613 base_package_label,
614 base_name,
615 removed_base_field,
616 False,
617 )
618 )
619 # Depend on the other end of the primary key if it's a relation
620 if primary_key_rel:
621 dep_pl, dep_mn = resolve_relation(
622 primary_key_rel,
623 package_label,
624 model_name,
625 )
626 dependencies.append((dep_pl, dep_mn, None, True))
627 # Generate creation operation
628 self.add_operation(
629 package_label,
630 operations.CreateModel(
631 name=model_state.name,
632 fields=[
633 d
634 for d in model_state.fields.items()
635 if d[0] not in related_fields
636 ],
637 options=model_state.options,
638 bases=model_state.bases,
639 ),
640 dependencies=dependencies,
641 beginning=True,
642 )
643
644 # Generate operations for each related field
645 for name, field in sorted(related_fields.items()):
646 dependencies = self._get_dependencies_for_foreign_key(
647 package_label,
648 model_name,
649 field,
650 self.to_state,
651 )
652 # Depend on our own model being created
653 dependencies.append((package_label, model_name, None, True))
654 # Make operation
655 self.add_operation(
656 package_label,
657 operations.AddField(
658 model_name=model_name,
659 name=name,
660 field=field,
661 ),
662 dependencies=list(set(dependencies)),
663 )
664
665 related_dependencies: list[tuple[str, str, str | None, bool | str]] = [
666 (package_label, model_name, name, True)
667 for name in sorted(related_fields)
668 ]
669 related_dependencies.append((package_label, model_name, None, True))
670 for index in indexes:
671 self.add_operation(
672 package_label,
673 operations.AddIndex(
674 model_name=model_name,
675 index=index,
676 ),
677 dependencies=related_dependencies,
678 )
679 for constraint in constraints:
680 self.add_operation(
681 package_label,
682 operations.AddConstraint(
683 model_name=model_name,
684 constraint=constraint,
685 ),
686 dependencies=related_dependencies,
687 )
688
689 def generate_deleted_models(self) -> None:
690 """
691 Find all deleted models and make delete
692 operations for them as well as separate operations to delete any
693 foreign key or M2M relationships (these are optimized later, if
694 possible).
695
696 Also bring forward removal of any model options that refer to
697 collections of fields - the inverse of generate_created_models().
698 """
699 deleted_models = self.old_model_keys - self.new_model_keys
700
701 for package_label, model_name in sorted(deleted_models):
702 model_state = self.from_state.models[package_label, model_name]
703 # Gather related fields
704 related_fields = {}
705 for field_name, field in model_state.fields.items():
706 if isinstance(field, RelatedField):
707 if field.remote_field.model:
708 related_fields[field_name] = field
709 if isinstance(field.remote_field, ManyToManyRel):
710 related_fields[field_name] = field
711
712 # Then remove each related field
713 for name in sorted(related_fields):
714 self.add_operation(
715 package_label,
716 operations.RemoveField(
717 model_name=model_name,
718 name=name,
719 ),
720 )
721 # Finally, remove the model.
722 # This depends on both the removal/alteration of all incoming fields
723 # and the removal of all its own related fields, and if it's
724 # a through model the field that references it.
725 dependencies = []
726 relations = self.from_state.relations
727 for (
728 related_object_package_label,
729 object_name,
730 ), relation_related_fields in relations[package_label, model_name].items():
731 for field_name, field in relation_related_fields.items():
732 dependencies.append(
733 (related_object_package_label, object_name, field_name, False),
734 )
735 if not isinstance(field, ManyToManyField):
736 dependencies.append(
737 (
738 related_object_package_label,
739 object_name,
740 field_name,
741 "alter",
742 ),
743 )
744
745 for name in sorted(related_fields):
746 dependencies.append((package_label, model_name, name, False))
747 # We're referenced in another field's through=
748 through_user = self.through_users.get(
749 (package_label, model_state.name_lower)
750 )
751 if through_user:
752 dependencies.append(
753 (through_user[0], through_user[1], through_user[2], False)
754 )
755 # Finally, make the operation, deduping any dependencies
756 self.add_operation(
757 package_label,
758 operations.DeleteModel(
759 name=model_state.name,
760 ),
761 dependencies=list(set(dependencies)),
762 )
763
764 def create_renamed_fields(self) -> None:
765 """Work out renamed fields."""
766 self.renamed_operations = []
767 old_field_keys = self.old_field_keys.copy()
768 for package_label, model_name, field_name in sorted(
769 self.new_field_keys - old_field_keys
770 ):
771 old_model_name = self.renamed_models.get(
772 (package_label, model_name), model_name
773 )
774 old_model_state = self.from_state.models[package_label, old_model_name]
775 new_model_state = self.to_state.models[package_label, model_name]
776 field = new_model_state.get_field(field_name)
777 # Scan to see if this is actually a rename!
778 field_dec = self.deep_deconstruct(field)
779 for rem_package_label, rem_model_name, rem_field_name in sorted(
780 old_field_keys - self.new_field_keys
781 ):
782 if rem_package_label == package_label and rem_model_name == model_name:
783 old_field = old_model_state.get_field(rem_field_name)
784 old_field_dec = self.deep_deconstruct(old_field)
785 if (
786 isinstance(field, RelatedField)
787 and field.remote_field
788 and field.remote_field.model
789 and "to" in old_field_dec[2]
790 ):
791 old_rel_to = old_field_dec[2]["to"]
792 if old_rel_to in self.renamed_models_rel:
793 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
794 old_field.set_attributes_from_name(rem_field_name)
795 if old_field_dec == field_dec:
796 if self.questioner.ask_rename(
797 model_name, rem_field_name, field_name, field
798 ):
799 self.renamed_operations.append(
800 (
801 rem_package_label,
802 rem_model_name,
803 rem_field_name,
804 package_label,
805 model_name,
806 field,
807 field_name,
808 )
809 )
810 old_field_keys.remove(
811 (rem_package_label, rem_model_name, rem_field_name)
812 )
813 old_field_keys.add((package_label, model_name, field_name))
814 self.renamed_fields[
815 package_label, model_name, field_name
816 ] = rem_field_name
817 break
818
819 def generate_renamed_fields(self) -> None:
820 """Generate RenameField operations."""
821 for (
822 rem_package_label,
823 rem_model_name,
824 rem_field_name,
825 package_label,
826 model_name,
827 field,
828 field_name,
829 ) in self.renamed_operations:
830 self.add_operation(
831 package_label,
832 operations.RenameField(
833 model_name=model_name,
834 old_name=rem_field_name,
835 new_name=field_name,
836 ),
837 )
838 self.old_field_keys.remove(
839 (rem_package_label, rem_model_name, rem_field_name)
840 )
841 self.old_field_keys.add((package_label, model_name, field_name))
842
843 def generate_added_fields(self) -> None:
844 """Make AddField operations."""
845 for package_label, model_name, field_name in sorted(
846 self.new_field_keys - self.old_field_keys
847 ):
848 self._generate_added_field(package_label, model_name, field_name)
849
850 def _generate_added_field(
851 self, package_label: str, model_name: str, field_name: str
852 ) -> None:
853 field = self.to_state.models[package_label, model_name].get_field(field_name)
854 # Adding a field always depends at least on its removal.
855 dependencies: list[tuple[str, str, str | None, bool | str]] = [
856 (package_label, model_name, field_name, False)
857 ]
858 # Fields that are foreignkeys/m2ms depend on stuff.
859 if isinstance(field, RelatedField) and field.remote_field.model:
860 dependencies.extend(
861 self._get_dependencies_for_foreign_key(
862 package_label,
863 model_name,
864 field,
865 self.to_state,
866 )
867 )
868 # You can't just add NOT NULL fields with no default or fields
869 # which don't allow empty strings as default.
870 time_fields = (DateField, DateTimeField, TimeField)
871 preserve_default = (
872 field.allow_null
873 or field.has_default()
874 or isinstance(field, ManyToManyField)
875 or (not field.required and field.empty_strings_allowed)
876 or (isinstance(field, time_fields) and field.auto_now)
877 )
878 if not preserve_default:
879 field = field.clone()
880 if isinstance(field, time_fields) and field.auto_now_add:
881 field.default = self.questioner.ask_auto_now_add_addition(
882 field_name, model_name
883 )
884 else:
885 field.default = self.questioner.ask_not_null_addition(
886 field_name, model_name
887 )
888 if (
889 field.primary_key
890 and field.default is not NOT_PROVIDED
891 and callable(field.default)
892 ):
893 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
894 self.add_operation(
895 package_label,
896 operations.AddField(
897 model_name=model_name,
898 name=field_name,
899 field=field,
900 preserve_default=preserve_default,
901 ),
902 dependencies=dependencies,
903 )
904
905 def generate_removed_fields(self) -> None:
906 """Make RemoveField operations."""
907 for package_label, model_name, field_name in sorted(
908 self.old_field_keys - self.new_field_keys
909 ):
910 self._generate_removed_field(package_label, model_name, field_name)
911
912 def _generate_removed_field(
913 self, package_label: str, model_name: str, field_name: str
914 ) -> None:
915 self.add_operation(
916 package_label,
917 operations.RemoveField(
918 model_name=model_name,
919 name=field_name,
920 ),
921 )
922
923 def generate_altered_fields(self) -> None:
924 """
925 Make AlterField operations, or possibly RemovedField/AddField if alter
926 isn't possible.
927 """
928 for package_label, model_name, field_name in sorted(
929 self.old_field_keys & self.new_field_keys
930 ):
931 # Did the field change?
932 old_model_name = self.renamed_models.get(
933 (package_label, model_name), model_name
934 )
935 old_field_name = self.renamed_fields.get(
936 (package_label, model_name, field_name), field_name
937 )
938 old_field = self.from_state.models[package_label, old_model_name].get_field(
939 old_field_name
940 )
941 new_field = self.to_state.models[package_label, model_name].get_field(
942 field_name
943 )
944 dependencies: list[tuple[str, str, str | None, bool | str]] = []
945 # Implement any model renames on relations; these are handled by RenameModel
946 # so we need to exclude them from the comparison
947 if hasattr(new_field, "remote_field") and getattr(
948 new_field.remote_field, "model", None
949 ):
950 rename_key = resolve_relation(
951 new_field.remote_field.model, # type: ignore[unresolved-attribute]
952 package_label,
953 model_name,
954 )
955 if rename_key in self.renamed_models:
956 new_field.remote_field.model = old_field.remote_field.model # type: ignore[unresolved-attribute]
957 # Handle ForeignKeyField which can only have a single to_field.
958 remote_field_name = getattr(new_field.remote_field, "field_name", None)
959 if remote_field_name:
960 to_field_rename_key = rename_key + (remote_field_name,)
961 if to_field_rename_key in self.renamed_fields:
962 # Repoint model name only
963 new_field.remote_field.model = old_field.remote_field.model # type: ignore[unresolved-attribute]
964 dependencies.extend(
965 self._get_dependencies_for_foreign_key(
966 package_label,
967 model_name,
968 new_field,
969 self.to_state,
970 )
971 )
972 if hasattr(new_field, "remote_field") and getattr(
973 new_field.remote_field, "through", None
974 ):
975 rename_key = resolve_relation(
976 new_field.remote_field.through, # type: ignore[unresolved-attribute]
977 package_label,
978 model_name,
979 )
980 if rename_key in self.renamed_models:
981 new_field.remote_field.through = old_field.remote_field.through # type: ignore[unresolved-attribute]
982 old_field_dec = self.deep_deconstruct(old_field)
983 new_field_dec = self.deep_deconstruct(new_field)
984 if old_field_dec != new_field_dec and old_field_name == field_name:
985 both_m2m = isinstance(old_field, ManyToManyField) and isinstance(
986 new_field, ManyToManyField
987 )
988 neither_m2m = not isinstance(
989 old_field, ManyToManyField
990 ) and not isinstance(new_field, ManyToManyField)
991 if both_m2m or neither_m2m:
992 # Either both fields are m2m or neither is
993 preserve_default = True
994 if (
995 old_field.allow_null
996 and not new_field.allow_null
997 and not new_field.has_default()
998 and not isinstance(new_field, ManyToManyField)
999 ):
1000 field = new_field.clone()
1001 new_default = self.questioner.ask_not_null_alteration(
1002 field_name, model_name
1003 )
1004 if new_default is not NOT_PROVIDED:
1005 field.default = new_default
1006 preserve_default = False
1007 else:
1008 field = new_field
1009 self.add_operation(
1010 package_label,
1011 operations.AlterField(
1012 model_name=model_name,
1013 name=field_name,
1014 field=field,
1015 preserve_default=preserve_default,
1016 ),
1017 dependencies=dependencies,
1018 )
1019 else:
1020 # We cannot alter between m2m and concrete fields
1021 self._generate_removed_field(package_label, model_name, field_name)
1022 self._generate_added_field(package_label, model_name, field_name)
1023
1024 def create_altered_indexes(self) -> None:
1025 option_name = operations.AddIndex.option_name
1026
1027 for package_label, model_name in sorted(self.kept_model_keys):
1028 old_model_name = self.renamed_models.get(
1029 (package_label, model_name), model_name
1030 )
1031 old_model_state = self.from_state.models[package_label, old_model_name]
1032 new_model_state = self.to_state.models[package_label, model_name]
1033
1034 old_indexes = old_model_state.options[option_name]
1035 new_indexes = new_model_state.options[option_name]
1036 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1037 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1038 renamed_indexes = []
1039 # Find renamed indexes.
1040 remove_from_added = []
1041 remove_from_removed = []
1042 for new_index in added_indexes:
1043 new_index_dec = new_index.deconstruct()
1044 new_index_name = new_index_dec[2].pop("name")
1045 for old_index in removed_indexes:
1046 old_index_dec = old_index.deconstruct()
1047 old_index_name = old_index_dec[2].pop("name")
1048 # Indexes are the same except for the names.
1049 if (
1050 new_index_dec == old_index_dec
1051 and new_index_name != old_index_name
1052 ):
1053 renamed_indexes.append((old_index_name, new_index_name, None))
1054 remove_from_added.append(new_index)
1055 remove_from_removed.append(old_index)
1056
1057 # Remove renamed indexes from the lists of added and removed
1058 # indexes.
1059 added_indexes = [
1060 idx for idx in added_indexes if idx not in remove_from_added
1061 ]
1062 removed_indexes = [
1063 idx for idx in removed_indexes if idx not in remove_from_removed
1064 ]
1065
1066 self.altered_indexes.update(
1067 {
1068 (package_label, model_name): {
1069 "added_indexes": added_indexes,
1070 "removed_indexes": removed_indexes,
1071 "renamed_indexes": renamed_indexes,
1072 }
1073 }
1074 )
1075
1076 def generate_added_indexes(self) -> None:
1077 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1078 dependencies = self._get_dependencies_for_model(package_label, model_name)
1079 for index in alt_indexes["added_indexes"]:
1080 self.add_operation(
1081 package_label,
1082 operations.AddIndex(
1083 model_name=model_name,
1084 index=index,
1085 ),
1086 dependencies=dependencies,
1087 )
1088
1089 def generate_removed_indexes(self) -> None:
1090 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1091 for index in alt_indexes["removed_indexes"]:
1092 self.add_operation(
1093 package_label,
1094 operations.RemoveIndex(
1095 model_name=model_name,
1096 name=index.name,
1097 ),
1098 )
1099
1100 def generate_renamed_indexes(self) -> None:
1101 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1102 for old_index_name, new_index_name, old_fields in alt_indexes[
1103 "renamed_indexes"
1104 ]:
1105 self.add_operation(
1106 package_label,
1107 operations.RenameIndex(
1108 model_name=model_name,
1109 new_name=new_index_name,
1110 old_name=old_index_name,
1111 old_fields=old_fields,
1112 ),
1113 )
1114
1115 def create_altered_constraints(self) -> None:
1116 option_name = operations.AddConstraint.option_name
1117 for package_label, model_name in sorted(self.kept_model_keys):
1118 old_model_name = self.renamed_models.get(
1119 (package_label, model_name), model_name
1120 )
1121 old_model_state = self.from_state.models[package_label, old_model_name]
1122 new_model_state = self.to_state.models[package_label, model_name]
1123
1124 old_constraints = old_model_state.options[option_name]
1125 new_constraints = new_model_state.options[option_name]
1126 add_constraints = [c for c in new_constraints if c not in old_constraints]
1127 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1128
1129 self.altered_constraints.update(
1130 {
1131 (package_label, model_name): {
1132 "added_constraints": add_constraints,
1133 "removed_constraints": rem_constraints,
1134 }
1135 }
1136 )
1137
1138 def generate_added_constraints(self) -> None:
1139 for (
1140 package_label,
1141 model_name,
1142 ), alt_constraints in self.altered_constraints.items():
1143 dependencies = self._get_dependencies_for_model(package_label, model_name)
1144 for constraint in alt_constraints["added_constraints"]:
1145 self.add_operation(
1146 package_label,
1147 operations.AddConstraint(
1148 model_name=model_name,
1149 constraint=constraint,
1150 ),
1151 dependencies=dependencies,
1152 )
1153
1154 def generate_removed_constraints(self) -> None:
1155 for (
1156 package_label,
1157 model_name,
1158 ), alt_constraints in self.altered_constraints.items():
1159 for constraint in alt_constraints["removed_constraints"]:
1160 self.add_operation(
1161 package_label,
1162 operations.RemoveConstraint(
1163 model_name=model_name,
1164 name=constraint.name,
1165 ),
1166 )
1167
1168 @staticmethod
1169 def _get_dependencies_for_foreign_key(
1170 package_label: str, model_name: str, field: Field, project_state: ProjectState
1171 ) -> list[tuple[str, str, str | None, bool | str]]:
1172 remote_field_model = None
1173 if isinstance(field, RelatedField):
1174 remote_field_model = field.remote_field.model
1175 else:
1176 relations = project_state.relations[package_label, model_name]
1177 for (remote_package_label, remote_model_name), fields in relations.items():
1178 if any(
1179 field == related_field.remote_field
1180 for related_field in fields.values()
1181 if isinstance(related_field, RelatedField)
1182 ):
1183 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1184 break
1185 dep_package_label, dep_object_name = resolve_relation(
1186 remote_field_model,
1187 package_label,
1188 model_name,
1189 )
1190 dependencies: list[tuple[str, str, str | None, bool | str]] = [
1191 (dep_package_label, dep_object_name, None, True)
1192 ]
1193 if isinstance(field, RelatedField) and isinstance(
1194 field.remote_field, ManyToManyRel
1195 ):
1196 through_package_label, through_object_name = resolve_relation(
1197 field.remote_field.through,
1198 package_label,
1199 model_name,
1200 )
1201 dependencies.append(
1202 (through_package_label, through_object_name, None, True)
1203 )
1204 return dependencies
1205
1206 def _get_dependencies_for_model(
1207 self, package_label: str, model_name: str
1208 ) -> list[tuple[str, str, str | None, bool | str]]:
1209 """Return foreign key dependencies of the given model."""
1210 dependencies = []
1211 model_state = self.to_state.models[package_label, model_name]
1212 for field in model_state.fields.values():
1213 if isinstance(field, RelatedField):
1214 dependencies.extend(
1215 self._get_dependencies_for_foreign_key(
1216 package_label,
1217 model_name,
1218 field,
1219 self.to_state,
1220 )
1221 )
1222 return dependencies
1223
1224 def generate_altered_db_table(self) -> None:
1225 for package_label, model_name in sorted(self.kept_model_keys):
1226 old_model_name = self.renamed_models.get(
1227 (package_label, model_name), model_name
1228 )
1229 old_model_state = self.from_state.models[package_label, old_model_name]
1230 new_model_state = self.to_state.models[package_label, model_name]
1231 old_db_table_name = old_model_state.options.get("db_table")
1232 new_db_table_name = new_model_state.options.get("db_table")
1233 if old_db_table_name != new_db_table_name:
1234 self.add_operation(
1235 package_label,
1236 operations.AlterModelTable(
1237 name=model_name,
1238 table=new_db_table_name,
1239 ),
1240 )
1241
1242 def generate_altered_options(self) -> None:
1243 """
1244 Work out if any non-schema-affecting options have changed and make an
1245 operation to represent them in state changes (in case Python code in
1246 migrations needs them).
1247 """
1248 for package_label, model_name in sorted(self.kept_model_keys):
1249 old_model_name = self.renamed_models.get(
1250 (package_label, model_name), model_name
1251 )
1252 old_model_state = self.from_state.models[package_label, old_model_name]
1253 new_model_state = self.to_state.models[package_label, model_name]
1254 old_options = {
1255 key: value
1256 for key, value in old_model_state.options.items()
1257 if key in AlterModelOptions.ALTER_OPTION_KEYS
1258 }
1259 new_options = {
1260 key: value
1261 for key, value in new_model_state.options.items()
1262 if key in AlterModelOptions.ALTER_OPTION_KEYS
1263 }
1264 if old_options != new_options:
1265 self.add_operation(
1266 package_label,
1267 operations.AlterModelOptions(
1268 name=model_name,
1269 options=new_options,
1270 ),
1271 )
1272
1273 def arrange_for_graph(
1274 self,
1275 changes: dict[str, list[Migration]],
1276 graph: MigrationGraph,
1277 migration_name: str | None = None,
1278 ) -> dict[str, list[Migration]]:
1279 """
1280 Take a result from changes() and a MigrationGraph, and fix the names
1281 and dependencies of the changes so they extend the graph from the leaf
1282 nodes for each app.
1283 """
1284 leaves = graph.leaf_nodes()
1285 name_map = {}
1286 for package_label, migrations in list(changes.items()):
1287 if not migrations:
1288 continue
1289 # Find the app label's current leaf node
1290 app_leaf = None
1291 for leaf in leaves:
1292 if leaf[0] == package_label:
1293 app_leaf = leaf
1294 break
1295 # Do they want an initial migration for this app?
1296 if app_leaf is None and not self.questioner.ask_initial(package_label):
1297 # They don't.
1298 for migration in migrations:
1299 name_map[(package_label, migration.name)] = (
1300 package_label,
1301 "__first__",
1302 )
1303 del changes[package_label]
1304 continue
1305 # Work out the next number in the sequence
1306 if app_leaf is None:
1307 next_number = 1
1308 else:
1309 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1310 # Name each migration
1311 for i, migration in enumerate(migrations):
1312 if i == 0 and app_leaf:
1313 migration.dependencies.append(app_leaf)
1314 new_name_parts = ["%04i" % next_number] # noqa: UP031
1315 if migration_name:
1316 new_name_parts.append(migration_name)
1317 elif i == 0 and not app_leaf:
1318 new_name_parts.append("initial")
1319 else:
1320 new_name_parts.append(migration.suggest_name()[:100])
1321 new_name = "_".join(new_name_parts)
1322 name_map[(package_label, migration.name)] = (package_label, new_name)
1323 next_number += 1
1324 migration.name = new_name
1325 # Now fix dependencies
1326 for migrations in changes.values():
1327 for migration in migrations:
1328 migration.dependencies = [
1329 name_map.get(d, d) for d in migration.dependencies
1330 ]
1331 return changes
1332
1333 def _trim_to_packages(
1334 self, changes: dict[str, list[Migration]], package_labels: set[str]
1335 ) -> dict[str, list[Migration]]:
1336 """
1337 Take changes from arrange_for_graph() and set of app labels, and return
1338 a modified set of changes which trims out as many migrations that are
1339 not in package_labels as possible. Note that some other migrations may
1340 still be present as they may be required dependencies.
1341 """
1342 # Gather other app dependencies in a first pass
1343 app_dependencies = {}
1344 for package_label, migrations in changes.items():
1345 for migration in migrations:
1346 for dep_package_label, name in migration.dependencies:
1347 app_dependencies.setdefault(package_label, set()).add(
1348 dep_package_label
1349 )
1350 required_packages = set(package_labels)
1351 # Keep resolving till there's no change
1352 old_required_packages = None
1353 while old_required_packages != required_packages:
1354 old_required_packages = set(required_packages)
1355 required_packages.update(
1356 *[
1357 app_dependencies.get(package_label, ())
1358 for package_label in required_packages
1359 ]
1360 )
1361 # Remove all migrations that aren't needed
1362 for package_label in list(changes):
1363 if package_label not in required_packages:
1364 del changes[package_label]
1365 return changes
1366
1367 @classmethod
1368 def parse_number(cls, name: str) -> int | None:
1369 """
1370 Given a migration name, try to extract a number from the beginning of
1371 it. For a squashed migration such as '0001_squashed_0004…', return the
1372 second number. If no number is found, return None.
1373 """
1374 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1375 return int(squashed_match[1])
1376 match = re.match(r"^\d+", name)
1377 if match:
1378 return int(match[0])
1379 return None