1from __future__ import annotations
2
3import functools
4import re
5from graphlib import TopologicalSorter
6from typing import TYPE_CHECKING, Any
7
8from plain.models.fields import (
9 NOT_PROVIDED,
10 DateField,
11 DateTimeField,
12 Field,
13 TimeField,
14)
15from plain.models.migrations import operations
16from plain.models.migrations.migration import Migration, SettingsTuple
17from plain.models.migrations.operations.models import AlterModelOptions
18from plain.models.migrations.optimizer import MigrationOptimizer
19from plain.models.migrations.questioner import MigrationQuestioner
20from plain.models.migrations.utils import (
21 COMPILED_REGEX_TYPE,
22 RegexObject,
23 resolve_relation,
24)
25from plain.runtime import settings
26
27if TYPE_CHECKING:
28 from plain.models.migrations.graph import MigrationGraph
29 from plain.models.migrations.operations.base import Operation
30 from plain.models.migrations.state import ProjectState
31
32
33class MigrationAutodetector:
34 """
35 Take a pair of ProjectStates and compare them to see what the first would
36 need doing to make it match the second (the second usually being the
37 project's current state).
38
39 Note that this naturally operates on entire projects at a time,
40 as it's likely that changes interact (for example, you can't
41 add a ForeignKey without having a migration to add the table it
42 depends on first). A user interface may offer single-app usage
43 if it wishes, with the caveat that it may not always be possible.
44 """
45
46 def __init__(
47 self,
48 from_state: ProjectState,
49 to_state: ProjectState,
50 questioner: MigrationQuestioner | None = None,
51 ):
52 self.from_state = from_state
53 self.to_state = to_state
54 self.questioner = questioner or MigrationQuestioner()
55 self.existing_packages = {app for app, model in from_state.models}
56
57 def changes(
58 self,
59 graph: MigrationGraph,
60 trim_to_packages: set[str] | None = None,
61 convert_packages: set[str] | None = None,
62 migration_name: str | None = None,
63 ) -> dict[str, list[Migration]]:
64 """
65 Main entry point to produce a list of applicable changes.
66 Take a graph to base names on and an optional set of packages
67 to try and restrict to (restriction is not guaranteed)
68 """
69 changes = self._detect_changes(convert_packages, graph)
70 changes = self.arrange_for_graph(changes, graph, migration_name)
71 if trim_to_packages:
72 changes = self._trim_to_packages(changes, trim_to_packages)
73 return changes
74
75 def deep_deconstruct(self, obj: Any) -> Any:
76 """
77 Recursive deconstruction for a field and its arguments.
78 Used for full comparison for rename/alter; sometimes a single-level
79 deconstruction will not compare correctly.
80 """
81 if isinstance(obj, list):
82 return [self.deep_deconstruct(value) for value in obj]
83 elif isinstance(obj, tuple):
84 return tuple(self.deep_deconstruct(value) for value in obj)
85 elif isinstance(obj, dict):
86 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
87 elif isinstance(obj, functools.partial):
88 return (
89 obj.func,
90 self.deep_deconstruct(obj.args),
91 self.deep_deconstruct(obj.keywords),
92 )
93 elif isinstance(obj, COMPILED_REGEX_TYPE):
94 return RegexObject(obj)
95 elif isinstance(obj, type):
96 # If this is a type that implements 'deconstruct' as an instance method,
97 # avoid treating this as being deconstructible itself - see #22951
98 return obj
99 elif hasattr(obj, "deconstruct"):
100 deconstructed = obj.deconstruct()
101 if isinstance(obj, Field):
102 # we have a field which also returns a name
103 deconstructed = deconstructed[1:]
104 path, args, kwargs = deconstructed
105 return (
106 path,
107 [self.deep_deconstruct(value) for value in args],
108 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
109 )
110 else:
111 return obj
112
113 def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
114 """
115 Return a definition of the fields that ignores field names and
116 what related fields actually relate to. Used for detecting renames (as
117 the related fields change during renames).
118 """
119 fields_def = []
120 for name, field in sorted(fields.items()):
121 deconstruction = self.deep_deconstruct(field)
122 if field.remote_field and field.remote_field.model:
123 deconstruction[2].pop("to", None)
124 fields_def.append(deconstruction)
125 return fields_def
126
127 def _detect_changes(
128 self,
129 convert_packages: set[str] | None = None,
130 graph: MigrationGraph | None = None,
131 ) -> dict[str, list[Migration]]:
132 """
133 Return a dict of migration plans which will achieve the
134 change from from_state to to_state. The dict has app labels
135 as keys and a list of migrations as values.
136
137 The resulting migrations aren't specially named, but the names
138 do matter for dependencies inside the set.
139
140 convert_packages is the list of packages to convert to use migrations
141 (i.e. to make initial migrations for, in the usual case)
142
143 graph is an optional argument that, if provided, can help improve
144 dependency generation and avoid potential circular dependencies.
145 """
146 # The first phase is generating all the operations for each app
147 # and gathering them into a big per-app list.
148 # Then go through that list, order it, and split into migrations to
149 # resolve dependencies caused by M2Ms and FKs.
150 self.generated_operations = {}
151 self.altered_indexes = {}
152 self.altered_constraints = {}
153 self.renamed_fields = {}
154
155 # Prepare some old/new state and model lists, ignoring unmigrated packages.
156 self.old_model_keys = set()
157 self.new_model_keys = set()
158 for (package_label, model_name), model_state in self.from_state.models.items():
159 if package_label not in self.from_state.real_packages:
160 self.old_model_keys.add((package_label, model_name))
161
162 for (package_label, model_name), model_state in self.to_state.models.items():
163 if package_label not in self.from_state.real_packages or (
164 convert_packages and package_label in convert_packages
165 ):
166 self.new_model_keys.add((package_label, model_name))
167
168 self.from_state.resolve_fields_and_relations()
169 self.to_state.resolve_fields_and_relations()
170
171 # Renames have to come first
172 self.generate_renamed_models()
173
174 # Prepare lists of fields and generate through model map
175 self._prepare_field_lists()
176 self._generate_through_model_map()
177
178 # Generate non-rename model operations
179 self.generate_deleted_models()
180 self.generate_created_models()
181 self.generate_altered_options()
182 self.generate_altered_db_table_comment()
183
184 # Create the renamed fields and store them in self.renamed_fields.
185 # They are used by create_altered_indexes(), generate_altered_fields(),
186 # generate_removed_altered_index(), and
187 # generate_altered_index().
188 self.create_renamed_fields()
189 # Create the altered indexes and store them in self.altered_indexes.
190 # This avoids the same computation in generate_removed_indexes()
191 # and generate_added_indexes().
192 self.create_altered_indexes()
193 self.create_altered_constraints()
194 # Generate index removal operations before field is removed
195 self.generate_removed_constraints()
196 self.generate_removed_indexes()
197 # Generate field renaming operations.
198 self.generate_renamed_fields()
199 self.generate_renamed_indexes()
200 # Generate field operations.
201 self.generate_removed_fields()
202 self.generate_added_fields()
203 self.generate_altered_fields()
204 self.generate_added_indexes()
205 self.generate_added_constraints()
206 self.generate_altered_db_table()
207
208 self._sort_migrations()
209 self._build_migration_list(graph)
210 self._optimize_migrations()
211
212 return self.migrations
213
214 def _prepare_field_lists(self) -> None:
215 """
216 Prepare field lists and a list of the fields that used through models
217 in the old state so dependencies can be made from the through model
218 deletion to the field that uses it.
219 """
220 self.kept_model_keys = self.old_model_keys & self.new_model_keys
221 self.through_users = {}
222 self.old_field_keys = {
223 (package_label, model_name, field_name)
224 for package_label, model_name in self.kept_model_keys
225 for field_name in self.from_state.models[
226 package_label,
227 self.renamed_models.get((package_label, model_name), model_name),
228 ].fields
229 }
230 self.new_field_keys = {
231 (package_label, model_name, field_name)
232 for package_label, model_name in self.kept_model_keys
233 for field_name in self.to_state.models[package_label, model_name].fields
234 }
235
236 def _generate_through_model_map(self) -> None:
237 """Through model map generation."""
238 for package_label, model_name in sorted(self.old_model_keys):
239 old_model_name = self.renamed_models.get(
240 (package_label, model_name), model_name
241 )
242 old_model_state = self.from_state.models[package_label, old_model_name]
243 for field_name, field in old_model_state.fields.items():
244 if hasattr(field, "remote_field") and getattr(
245 field.remote_field, "through", None
246 ):
247 through_key = resolve_relation(
248 field.remote_field.through, package_label, model_name
249 )
250 self.through_users[through_key] = (
251 package_label,
252 old_model_name,
253 field_name,
254 )
255
256 @staticmethod
257 def _resolve_dependency(
258 dependency: tuple[str, str, str | None, bool | str],
259 ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
260 """
261 Return the resolved dependency and a boolean denoting whether or not
262 it was a settings dependency.
263 """
264 if not isinstance(dependency, SettingsTuple):
265 return dependency, False
266 resolved_package_label, resolved_object_name = getattr(
267 settings, dependency[1]
268 ).split(".")
269 return (resolved_package_label, resolved_object_name.lower()) + dependency[
270 2:
271 ], True
272
273 def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
274 """
275 Chop the lists of operations up into migrations with dependencies on
276 each other. Do this by going through an app's list of operations until
277 one is found that has an outgoing dependency that isn't in another
278 app's migration yet (hasn't been chopped off its list). Then chop off
279 the operations before it into a migration and move onto the next app.
280 If the loops completes without doing anything, there's a circular
281 dependency (which _should_ be impossible as the operations are
282 all split at this point so they can't depend and be depended on).
283 """
284 self.migrations = {}
285 num_ops = sum(len(x) for x in self.generated_operations.values())
286 chop_mode = False
287 while num_ops:
288 # On every iteration, we step through all the packages and see if there
289 # is a completed set of operations.
290 # If we find that a subset of the operations are complete we can
291 # try to chop it off from the rest and continue, but we only
292 # do this if we've already been through the list once before
293 # without any chopping and nothing has changed.
294 for package_label in sorted(self.generated_operations):
295 chopped = []
296 dependencies = set()
297 for operation in list(self.generated_operations[package_label]):
298 deps_satisfied = True
299 operation_dependencies = set()
300 for dep in operation._auto_deps:
301 # Temporarily resolve the settings dependency to
302 # prevent circular references. While keeping the
303 # dependency checks on the resolved model, add the
304 # settings dependencies.
305 original_dep = dep
306 dep, is_settings_dep = self._resolve_dependency(dep)
307 if dep[0] != package_label:
308 # External app dependency. See if it's not yet
309 # satisfied.
310 for other_operation in self.generated_operations.get(
311 dep[0], []
312 ):
313 if self.check_dependency(other_operation, dep):
314 deps_satisfied = False
315 break
316 if not deps_satisfied:
317 break
318 else:
319 if is_settings_dep:
320 operation_dependencies.add(
321 (original_dep[0], original_dep[1])
322 )
323 elif dep[0] in self.migrations:
324 operation_dependencies.add(
325 (dep[0], self.migrations[dep[0]][-1].name)
326 )
327 else:
328 # If we can't find the other app, we add a
329 # first/last dependency, but only if we've
330 # already been through once and checked
331 # everything.
332 if chop_mode:
333 # If the app already exists, we add a
334 # dependency on the last migration, as
335 # we don't know which migration
336 # contains the target field. If it's
337 # not yet migrated or has no
338 # migrations, we use __first__.
339 if graph and graph.leaf_nodes(dep[0]):
340 operation_dependencies.add(
341 graph.leaf_nodes(dep[0])[0]
342 )
343 else:
344 operation_dependencies.add(
345 (dep[0], "__first__")
346 )
347 else:
348 deps_satisfied = False
349 if deps_satisfied:
350 chopped.append(operation)
351 dependencies.update(operation_dependencies)
352 del self.generated_operations[package_label][0]
353 else:
354 break
355 # Make a migration! Well, only if there's stuff to put in it
356 if dependencies or chopped:
357 if not self.generated_operations[package_label] or chop_mode:
358 subclass = type(
359 "Migration",
360 (Migration,),
361 {"operations": [], "dependencies": []},
362 )
363 instance = subclass(
364 "auto_%i" # noqa: UP031
365 % (len(self.migrations.get(package_label, [])) + 1),
366 package_label,
367 )
368 instance.dependencies = list(dependencies)
369 instance.operations = chopped
370 instance.initial = package_label not in self.existing_packages
371 self.migrations.setdefault(package_label, []).append(instance)
372 chop_mode = False
373 else:
374 self.generated_operations[package_label] = (
375 chopped + self.generated_operations[package_label]
376 )
377 new_num_ops = sum(len(x) for x in self.generated_operations.values())
378 if new_num_ops == num_ops:
379 if not chop_mode:
380 chop_mode = True
381 else:
382 raise ValueError(
383 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
384 )
385 num_ops = new_num_ops
386
387 def _sort_migrations(self) -> None:
388 """
389 Reorder to make things possible. Reordering may be needed so FKs work
390 nicely inside the same app.
391 """
392 for package_label, ops in sorted(self.generated_operations.items()):
393 ts = TopologicalSorter()
394 for op in ops:
395 ts.add(op)
396 for dep in op._auto_deps:
397 # Resolve intra-app dependencies to handle circular
398 # references involving a settings model.
399 dep = self._resolve_dependency(dep)[0]
400 if dep[0] != package_label:
401 continue
402 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
403 self.generated_operations[package_label] = list(ts.static_order())
404
405 def _optimize_migrations(self) -> None:
406 # Add in internal dependencies among the migrations
407 for package_label, migrations in self.migrations.items():
408 for m1, m2 in zip(migrations, migrations[1:]):
409 m2.dependencies.append((package_label, m1.name))
410
411 # De-dupe dependencies
412 for migrations in self.migrations.values():
413 for migration in migrations:
414 migration.dependencies = list(set(migration.dependencies))
415
416 # Optimize migrations
417 for package_label, migrations in self.migrations.items():
418 for migration in migrations:
419 migration.operations = MigrationOptimizer().optimize(
420 migration.operations, package_label
421 )
422
423 def check_dependency(
424 self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
425 ) -> bool:
426 """
427 Return True if the given operation depends on the given dependency,
428 False otherwise.
429 """
430 # Created model
431 if dependency[2] is None and dependency[3] is True:
432 return (
433 isinstance(operation, operations.CreateModel)
434 and operation.name_lower == dependency[1].lower()
435 )
436 # Created field
437 elif dependency[2] is not None and dependency[3] is True:
438 return (
439 isinstance(operation, operations.CreateModel)
440 and operation.name_lower == dependency[1].lower()
441 and any(dependency[2] == x for x, y in operation.fields)
442 ) or (
443 isinstance(operation, operations.AddField)
444 and operation.model_name_lower == dependency[1].lower()
445 and operation.name_lower == dependency[2].lower()
446 )
447 # Removed field
448 elif dependency[2] is not None and dependency[3] is False:
449 return (
450 isinstance(operation, operations.RemoveField)
451 and operation.model_name_lower == dependency[1].lower()
452 and operation.name_lower == dependency[2].lower()
453 )
454 # Removed model
455 elif dependency[2] is None and dependency[3] is False:
456 return (
457 isinstance(operation, operations.DeleteModel)
458 and operation.name_lower == dependency[1].lower()
459 )
460 # Field being altered
461 elif dependency[2] is not None and dependency[3] == "alter":
462 return (
463 isinstance(operation, operations.AlterField)
464 and operation.model_name_lower == dependency[1].lower()
465 and operation.name_lower == dependency[2].lower()
466 )
467 # Unknown dependency. Raise an error.
468 else:
469 raise ValueError(f"Can't handle dependency {dependency!r}")
470
471 def add_operation(
472 self,
473 package_label: str,
474 operation: Operation,
475 dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
476 beginning: bool = False,
477 ) -> None:
478 # Operation dependencies are 4-element tuples:
479 # (package_label, model_name, field_name, create/delete as True/False or "alter")
480 operation._auto_deps = dependencies or [] # type: ignore[attr-defined]
481 if beginning:
482 self.generated_operations.setdefault(package_label, []).insert(0, operation)
483 else:
484 self.generated_operations.setdefault(package_label, []).append(operation)
485
486 def generate_renamed_models(self) -> None:
487 """
488 Find any renamed models, generate the operations for them, and remove
489 the old entry from the model lists. Must be run before other
490 model-level generation.
491 """
492 self.renamed_models = {}
493 self.renamed_models_rel = {}
494 added_models = self.new_model_keys - self.old_model_keys
495 for package_label, model_name in sorted(added_models):
496 model_state = self.to_state.models[package_label, model_name]
497 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
498
499 removed_models = self.old_model_keys - self.new_model_keys
500 for rem_package_label, rem_model_name in removed_models:
501 if rem_package_label == package_label:
502 rem_model_state = self.from_state.models[
503 rem_package_label, rem_model_name
504 ]
505 rem_model_fields_def = self.only_relation_agnostic_fields(
506 rem_model_state.fields
507 )
508 if model_fields_def == rem_model_fields_def:
509 if self.questioner.ask_rename_model(
510 rem_model_state, model_state
511 ):
512 dependencies = []
513 fields = list(model_state.fields.values()) + [
514 field.remote_field
515 for relations in self.to_state.relations[
516 package_label, model_name
517 ].values()
518 for field in relations.values()
519 ]
520 for field in fields:
521 if field.is_relation:
522 dependencies.extend(
523 self._get_dependencies_for_foreign_key(
524 package_label,
525 model_name,
526 field,
527 self.to_state,
528 )
529 )
530 self.add_operation(
531 package_label,
532 operations.RenameModel(
533 old_name=rem_model_state.name,
534 new_name=model_state.name,
535 ),
536 dependencies=dependencies,
537 )
538 self.renamed_models[package_label, model_name] = (
539 rem_model_name
540 )
541 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
542 self.renamed_models_rel[renamed_models_rel_key] = (
543 f"{model_state.package_label}.{model_state.name_lower}"
544 )
545 self.old_model_keys.remove(
546 (rem_package_label, rem_model_name)
547 )
548 self.old_model_keys.add((package_label, model_name))
549 break
550
551 def generate_created_models(self) -> None:
552 """
553 Find all new models and make create
554 operations for them as well as separate operations to create any
555 foreign key or M2M relationships (these are optimized later, if
556 possible).
557
558 Defer any model options that refer to collections of fields that might
559 be deferred.
560 """
561 added_models = self.new_model_keys - self.old_model_keys
562
563 for package_label, model_name in added_models:
564 model_state = self.to_state.models[package_label, model_name]
565 # Gather related fields
566 related_fields = {}
567 primary_key_rel = None
568 for field_name, field in model_state.fields.items():
569 if field.remote_field:
570 if field.remote_field.model:
571 if field.primary_key:
572 primary_key_rel = field.remote_field.model
573 else:
574 related_fields[field_name] = field
575 if getattr(field.remote_field, "through", None):
576 related_fields[field_name] = field
577
578 # Are there indexes to defer?
579 indexes = model_state.options.pop("indexes")
580 constraints = model_state.options.pop("constraints")
581 # Depend on the deletion of any possible proxy version of us
582 dependencies = [
583 (package_label, model_name, None, False),
584 ]
585 # Depend on all bases
586 for base in model_state.bases:
587 if isinstance(base, str) and "." in base:
588 base_package_label, base_name = base.split(".", 1)
589 dependencies.append((base_package_label, base_name, None, True))
590 # Depend on the removal of base fields if the new model has
591 # a field with the same name.
592 old_base_model_state = self.from_state.models.get(
593 (base_package_label, base_name)
594 )
595 new_base_model_state = self.to_state.models.get(
596 (base_package_label, base_name)
597 )
598 if old_base_model_state and new_base_model_state:
599 removed_base_fields = (
600 set(old_base_model_state.fields)
601 .difference(
602 new_base_model_state.fields,
603 )
604 .intersection(model_state.fields)
605 )
606 for removed_base_field in removed_base_fields:
607 dependencies.append(
608 (
609 base_package_label,
610 base_name,
611 removed_base_field,
612 False,
613 )
614 )
615 # Depend on the other end of the primary key if it's a relation
616 if primary_key_rel:
617 dependencies.append(
618 resolve_relation(
619 primary_key_rel,
620 package_label,
621 model_name,
622 )
623 + (None, True)
624 )
625 # Generate creation operation
626 self.add_operation(
627 package_label,
628 operations.CreateModel(
629 name=model_state.name,
630 fields=[
631 d
632 for d in model_state.fields.items()
633 if d[0] not in related_fields
634 ],
635 options=model_state.options,
636 bases=model_state.bases,
637 ),
638 dependencies=dependencies,
639 beginning=True,
640 )
641
642 # Generate operations for each related field
643 for name, field in sorted(related_fields.items()):
644 dependencies = self._get_dependencies_for_foreign_key(
645 package_label,
646 model_name,
647 field,
648 self.to_state,
649 )
650 # Depend on our own model being created
651 dependencies.append((package_label, model_name, None, True))
652 # Make operation
653 self.add_operation(
654 package_label,
655 operations.AddField(
656 model_name=model_name,
657 name=name,
658 field=field,
659 ),
660 dependencies=list(set(dependencies)),
661 )
662
663 related_dependencies = [
664 (package_label, model_name, name, True)
665 for name in sorted(related_fields)
666 ]
667 related_dependencies.append((package_label, model_name, None, True))
668 for index in indexes:
669 self.add_operation(
670 package_label,
671 operations.AddIndex(
672 model_name=model_name,
673 index=index,
674 ),
675 dependencies=related_dependencies,
676 )
677 for constraint in constraints:
678 self.add_operation(
679 package_label,
680 operations.AddConstraint(
681 model_name=model_name,
682 constraint=constraint,
683 ),
684 dependencies=related_dependencies,
685 )
686
687 def generate_deleted_models(self) -> None:
688 """
689 Find all deleted models and make delete
690 operations for them as well as separate operations to delete any
691 foreign key or M2M relationships (these are optimized later, if
692 possible).
693
694 Also bring forward removal of any model options that refer to
695 collections of fields - the inverse of generate_created_models().
696 """
697 deleted_models = self.old_model_keys - self.new_model_keys
698
699 for package_label, model_name in sorted(deleted_models):
700 model_state = self.from_state.models[package_label, model_name]
701 # Gather related fields
702 related_fields = {}
703 for field_name, field in model_state.fields.items():
704 if field.remote_field:
705 if field.remote_field.model:
706 related_fields[field_name] = field
707 if getattr(field.remote_field, "through", None):
708 related_fields[field_name] = field
709
710 # Then remove each related field
711 for name in sorted(related_fields):
712 self.add_operation(
713 package_label,
714 operations.RemoveField(
715 model_name=model_name,
716 name=name,
717 ),
718 )
719 # Finally, remove the model.
720 # This depends on both the removal/alteration of all incoming fields
721 # and the removal of all its own related fields, and if it's
722 # a through model the field that references it.
723 dependencies = []
724 relations = self.from_state.relations
725 for (
726 related_object_package_label,
727 object_name,
728 ), relation_related_fields in relations[package_label, model_name].items():
729 for field_name, field in relation_related_fields.items():
730 dependencies.append(
731 (related_object_package_label, object_name, field_name, False),
732 )
733 if not field.many_to_many:
734 dependencies.append(
735 (
736 related_object_package_label,
737 object_name,
738 field_name,
739 "alter",
740 ),
741 )
742
743 for name in sorted(related_fields):
744 dependencies.append((package_label, model_name, name, False))
745 # We're referenced in another field's through=
746 through_user = self.through_users.get(
747 (package_label, model_state.name_lower)
748 )
749 if through_user:
750 dependencies.append(
751 (through_user[0], through_user[1], through_user[2], False)
752 )
753 # Finally, make the operation, deduping any dependencies
754 self.add_operation(
755 package_label,
756 operations.DeleteModel(
757 name=model_state.name,
758 ),
759 dependencies=list(set(dependencies)),
760 )
761
762 def create_renamed_fields(self) -> None:
763 """Work out renamed fields."""
764 self.renamed_operations = []
765 old_field_keys = self.old_field_keys.copy()
766 for package_label, model_name, field_name in sorted(
767 self.new_field_keys - old_field_keys
768 ):
769 old_model_name = self.renamed_models.get(
770 (package_label, model_name), model_name
771 )
772 old_model_state = self.from_state.models[package_label, old_model_name]
773 new_model_state = self.to_state.models[package_label, model_name]
774 field = new_model_state.get_field(field_name)
775 # Scan to see if this is actually a rename!
776 field_dec = self.deep_deconstruct(field)
777 for rem_package_label, rem_model_name, rem_field_name in sorted(
778 old_field_keys - self.new_field_keys
779 ):
780 if rem_package_label == package_label and rem_model_name == model_name:
781 old_field = old_model_state.get_field(rem_field_name)
782 old_field_dec = self.deep_deconstruct(old_field)
783 if (
784 field.remote_field
785 and field.remote_field.model
786 and "to" in old_field_dec[2]
787 ):
788 old_rel_to = old_field_dec[2]["to"]
789 if old_rel_to in self.renamed_models_rel:
790 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
791 old_field.set_attributes_from_name(rem_field_name)
792 old_db_column = old_field.get_attname_column()[1]
793 if old_field_dec == field_dec or (
794 # Was the field renamed and db_column equal to the
795 # old field's column added?
796 old_field_dec[0:2] == field_dec[0:2]
797 and dict(old_field_dec[2], db_column=old_db_column)
798 == field_dec[2]
799 ):
800 if self.questioner.ask_rename(
801 model_name, rem_field_name, field_name, field
802 ):
803 self.renamed_operations.append(
804 (
805 rem_package_label,
806 rem_model_name,
807 old_field.db_column,
808 rem_field_name,
809 package_label,
810 model_name,
811 field,
812 field_name,
813 )
814 )
815 old_field_keys.remove(
816 (rem_package_label, rem_model_name, rem_field_name)
817 )
818 old_field_keys.add((package_label, model_name, field_name))
819 self.renamed_fields[
820 package_label, model_name, field_name
821 ] = rem_field_name
822 break
823
824 def generate_renamed_fields(self) -> None:
825 """Generate RenameField operations."""
826 for (
827 rem_package_label,
828 rem_model_name,
829 rem_db_column,
830 rem_field_name,
831 package_label,
832 model_name,
833 field,
834 field_name,
835 ) in self.renamed_operations:
836 # A db_column mismatch requires a prior noop AlterField for the
837 # subsequent RenameField to be a noop on attempts at preserving the
838 # old name.
839 if rem_db_column != field.db_column:
840 altered_field = field.clone()
841 altered_field.name = rem_field_name
842 self.add_operation(
843 package_label,
844 operations.AlterField(
845 model_name=model_name,
846 name=rem_field_name,
847 field=altered_field,
848 ),
849 )
850 self.add_operation(
851 package_label,
852 operations.RenameField(
853 model_name=model_name,
854 old_name=rem_field_name,
855 new_name=field_name,
856 ),
857 )
858 self.old_field_keys.remove(
859 (rem_package_label, rem_model_name, rem_field_name)
860 )
861 self.old_field_keys.add((package_label, model_name, field_name))
862
863 def generate_added_fields(self) -> None:
864 """Make AddField operations."""
865 for package_label, model_name, field_name in sorted(
866 self.new_field_keys - self.old_field_keys
867 ):
868 self._generate_added_field(package_label, model_name, field_name)
869
870 def _generate_added_field(
871 self, package_label: str, model_name: str, field_name: str
872 ) -> None:
873 field = self.to_state.models[package_label, model_name].get_field(field_name)
874 # Adding a field always depends at least on its removal.
875 dependencies = [(package_label, model_name, field_name, False)]
876 # Fields that are foreignkeys/m2ms depend on stuff.
877 if field.remote_field and field.remote_field.model:
878 dependencies.extend(
879 self._get_dependencies_for_foreign_key(
880 package_label,
881 model_name,
882 field,
883 self.to_state,
884 )
885 )
886 # You can't just add NOT NULL fields with no default or fields
887 # which don't allow empty strings as default.
888 time_fields = (DateField, DateTimeField, TimeField)
889 preserve_default = (
890 field.allow_null
891 or field.has_default()
892 or field.many_to_many
893 or (not field.required and field.empty_strings_allowed)
894 or (isinstance(field, time_fields) and field.auto_now)
895 )
896 if not preserve_default:
897 field = field.clone()
898 if isinstance(field, time_fields) and field.auto_now_add:
899 field.default = self.questioner.ask_auto_now_add_addition(
900 field_name, model_name
901 )
902 else:
903 field.default = self.questioner.ask_not_null_addition(
904 field_name, model_name
905 )
906 if (
907 field.primary_key
908 and field.default is not NOT_PROVIDED
909 and callable(field.default)
910 ):
911 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
912 self.add_operation(
913 package_label,
914 operations.AddField(
915 model_name=model_name,
916 name=field_name,
917 field=field,
918 preserve_default=preserve_default,
919 ),
920 dependencies=dependencies,
921 )
922
923 def generate_removed_fields(self) -> None:
924 """Make RemoveField operations."""
925 for package_label, model_name, field_name in sorted(
926 self.old_field_keys - self.new_field_keys
927 ):
928 self._generate_removed_field(package_label, model_name, field_name)
929
930 def _generate_removed_field(
931 self, package_label: str, model_name: str, field_name: str
932 ) -> None:
933 self.add_operation(
934 package_label,
935 operations.RemoveField(
936 model_name=model_name,
937 name=field_name,
938 ),
939 )
940
941 def generate_altered_fields(self) -> None:
942 """
943 Make AlterField operations, or possibly RemovedField/AddField if alter
944 isn't possible.
945 """
946 for package_label, model_name, field_name in sorted(
947 self.old_field_keys & self.new_field_keys
948 ):
949 # Did the field change?
950 old_model_name = self.renamed_models.get(
951 (package_label, model_name), model_name
952 )
953 old_field_name = self.renamed_fields.get(
954 (package_label, model_name, field_name), field_name
955 )
956 old_field = self.from_state.models[package_label, old_model_name].get_field(
957 old_field_name
958 )
959 new_field = self.to_state.models[package_label, model_name].get_field(
960 field_name
961 )
962 dependencies = []
963 # Implement any model renames on relations; these are handled by RenameModel
964 # so we need to exclude them from the comparison
965 if hasattr(new_field, "remote_field") and getattr(
966 new_field.remote_field, "model", None
967 ):
968 rename_key = resolve_relation(
969 new_field.remote_field.model, package_label, model_name
970 )
971 if rename_key in self.renamed_models:
972 new_field.remote_field.model = old_field.remote_field.model
973 # Handle ForeignKey which can only have a single to_field.
974 remote_field_name = getattr(new_field.remote_field, "field_name", None)
975 if remote_field_name:
976 to_field_rename_key = rename_key + (remote_field_name,)
977 if to_field_rename_key in self.renamed_fields:
978 # Repoint model name only
979 new_field.remote_field.model = old_field.remote_field.model
980 dependencies.extend(
981 self._get_dependencies_for_foreign_key(
982 package_label,
983 model_name,
984 new_field,
985 self.to_state,
986 )
987 )
988 if hasattr(new_field, "remote_field") and getattr(
989 new_field.remote_field, "through", None
990 ):
991 rename_key = resolve_relation(
992 new_field.remote_field.through, package_label, model_name
993 )
994 if rename_key in self.renamed_models:
995 new_field.remote_field.through = old_field.remote_field.through
996 old_field_dec = self.deep_deconstruct(old_field)
997 new_field_dec = self.deep_deconstruct(new_field)
998 # If the field was confirmed to be renamed it means that only
999 # db_column was allowed to change which generate_renamed_fields()
1000 # already accounts for by adding an AlterField operation.
1001 if old_field_dec != new_field_dec and old_field_name == field_name:
1002 both_m2m = old_field.many_to_many and new_field.many_to_many
1003 neither_m2m = not old_field.many_to_many and not new_field.many_to_many
1004 if both_m2m or neither_m2m:
1005 # Either both fields are m2m or neither is
1006 preserve_default = True
1007 if (
1008 old_field.allow_null
1009 and not new_field.allow_null
1010 and not new_field.has_default()
1011 and not new_field.many_to_many
1012 ):
1013 field = new_field.clone()
1014 new_default = self.questioner.ask_not_null_alteration(
1015 field_name, model_name
1016 )
1017 if new_default is not NOT_PROVIDED:
1018 field.default = new_default
1019 preserve_default = False
1020 else:
1021 field = new_field
1022 self.add_operation(
1023 package_label,
1024 operations.AlterField(
1025 model_name=model_name,
1026 name=field_name,
1027 field=field,
1028 preserve_default=preserve_default,
1029 ),
1030 dependencies=dependencies,
1031 )
1032 else:
1033 # We cannot alter between m2m and concrete fields
1034 self._generate_removed_field(package_label, model_name, field_name)
1035 self._generate_added_field(package_label, model_name, field_name)
1036
1037 def create_altered_indexes(self) -> None:
1038 option_name = operations.AddIndex.option_name
1039
1040 for package_label, model_name in sorted(self.kept_model_keys):
1041 old_model_name = self.renamed_models.get(
1042 (package_label, model_name), model_name
1043 )
1044 old_model_state = self.from_state.models[package_label, old_model_name]
1045 new_model_state = self.to_state.models[package_label, model_name]
1046
1047 old_indexes = old_model_state.options[option_name]
1048 new_indexes = new_model_state.options[option_name]
1049 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1050 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1051 renamed_indexes = []
1052 # Find renamed indexes.
1053 remove_from_added = []
1054 remove_from_removed = []
1055 for new_index in added_indexes:
1056 new_index_dec = new_index.deconstruct()
1057 new_index_name = new_index_dec[2].pop("name")
1058 for old_index in removed_indexes:
1059 old_index_dec = old_index.deconstruct()
1060 old_index_name = old_index_dec[2].pop("name")
1061 # Indexes are the same except for the names.
1062 if (
1063 new_index_dec == old_index_dec
1064 and new_index_name != old_index_name
1065 ):
1066 renamed_indexes.append((old_index_name, new_index_name, None))
1067 remove_from_added.append(new_index)
1068 remove_from_removed.append(old_index)
1069
1070 # Remove renamed indexes from the lists of added and removed
1071 # indexes.
1072 added_indexes = [
1073 idx for idx in added_indexes if idx not in remove_from_added
1074 ]
1075 removed_indexes = [
1076 idx for idx in removed_indexes if idx not in remove_from_removed
1077 ]
1078
1079 self.altered_indexes.update(
1080 {
1081 (package_label, model_name): {
1082 "added_indexes": added_indexes,
1083 "removed_indexes": removed_indexes,
1084 "renamed_indexes": renamed_indexes,
1085 }
1086 }
1087 )
1088
1089 def generate_added_indexes(self) -> None:
1090 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1091 dependencies = self._get_dependencies_for_model(package_label, model_name)
1092 for index in alt_indexes["added_indexes"]:
1093 self.add_operation(
1094 package_label,
1095 operations.AddIndex(
1096 model_name=model_name,
1097 index=index,
1098 ),
1099 dependencies=dependencies,
1100 )
1101
1102 def generate_removed_indexes(self) -> None:
1103 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1104 for index in alt_indexes["removed_indexes"]:
1105 self.add_operation(
1106 package_label,
1107 operations.RemoveIndex(
1108 model_name=model_name,
1109 name=index.name,
1110 ),
1111 )
1112
1113 def generate_renamed_indexes(self) -> None:
1114 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1115 for old_index_name, new_index_name, old_fields in alt_indexes[
1116 "renamed_indexes"
1117 ]:
1118 self.add_operation(
1119 package_label,
1120 operations.RenameIndex(
1121 model_name=model_name,
1122 new_name=new_index_name,
1123 old_name=old_index_name,
1124 old_fields=old_fields,
1125 ),
1126 )
1127
1128 def create_altered_constraints(self) -> None:
1129 option_name = operations.AddConstraint.option_name
1130 for package_label, model_name in sorted(self.kept_model_keys):
1131 old_model_name = self.renamed_models.get(
1132 (package_label, model_name), model_name
1133 )
1134 old_model_state = self.from_state.models[package_label, old_model_name]
1135 new_model_state = self.to_state.models[package_label, model_name]
1136
1137 old_constraints = old_model_state.options[option_name]
1138 new_constraints = new_model_state.options[option_name]
1139 add_constraints = [c for c in new_constraints if c not in old_constraints]
1140 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1141
1142 self.altered_constraints.update(
1143 {
1144 (package_label, model_name): {
1145 "added_constraints": add_constraints,
1146 "removed_constraints": rem_constraints,
1147 }
1148 }
1149 )
1150
1151 def generate_added_constraints(self) -> None:
1152 for (
1153 package_label,
1154 model_name,
1155 ), alt_constraints in self.altered_constraints.items():
1156 dependencies = self._get_dependencies_for_model(package_label, model_name)
1157 for constraint in alt_constraints["added_constraints"]:
1158 self.add_operation(
1159 package_label,
1160 operations.AddConstraint(
1161 model_name=model_name,
1162 constraint=constraint,
1163 ),
1164 dependencies=dependencies,
1165 )
1166
1167 def generate_removed_constraints(self) -> None:
1168 for (
1169 package_label,
1170 model_name,
1171 ), alt_constraints in self.altered_constraints.items():
1172 for constraint in alt_constraints["removed_constraints"]:
1173 self.add_operation(
1174 package_label,
1175 operations.RemoveConstraint(
1176 model_name=model_name,
1177 name=constraint.name,
1178 ),
1179 )
1180
1181 @staticmethod
1182 def _get_dependencies_for_foreign_key(
1183 package_label: str, model_name: str, field: Field, project_state: ProjectState
1184 ) -> list[tuple[str, str, str | None, bool | str]]:
1185 remote_field_model = None
1186 if hasattr(field.remote_field, "model"):
1187 remote_field_model = field.remote_field.model
1188 else:
1189 relations = project_state.relations[package_label, model_name]
1190 for (remote_package_label, remote_model_name), fields in relations.items():
1191 if any(
1192 field == related_field.remote_field
1193 for related_field in fields.values()
1194 ):
1195 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1196 break
1197 dep_package_label, dep_object_name = resolve_relation(
1198 remote_field_model,
1199 package_label,
1200 model_name,
1201 )
1202 dependencies = [(dep_package_label, dep_object_name, None, True)]
1203 if getattr(field.remote_field, "through", None):
1204 through_package_label, through_object_name = resolve_relation(
1205 field.remote_field.through, # type: ignore[attr-defined]
1206 package_label,
1207 model_name,
1208 )
1209 dependencies.append(
1210 (through_package_label, through_object_name, None, True)
1211 )
1212 return dependencies
1213
1214 def _get_dependencies_for_model(
1215 self, package_label: str, model_name: str
1216 ) -> list[tuple[str, str, str | None, bool | str]]:
1217 """Return foreign key dependencies of the given model."""
1218 dependencies = []
1219 model_state = self.to_state.models[package_label, model_name]
1220 for field in model_state.fields.values():
1221 if field.is_relation:
1222 dependencies.extend(
1223 self._get_dependencies_for_foreign_key(
1224 package_label,
1225 model_name,
1226 field,
1227 self.to_state,
1228 )
1229 )
1230 return dependencies
1231
1232 def generate_altered_db_table(self) -> None:
1233 for package_label, model_name in sorted(self.kept_model_keys):
1234 old_model_name = self.renamed_models.get(
1235 (package_label, model_name), model_name
1236 )
1237 old_model_state = self.from_state.models[package_label, old_model_name]
1238 new_model_state = self.to_state.models[package_label, model_name]
1239 old_db_table_name = old_model_state.options.get("db_table")
1240 new_db_table_name = new_model_state.options.get("db_table")
1241 if old_db_table_name != new_db_table_name:
1242 self.add_operation(
1243 package_label,
1244 operations.AlterModelTable(
1245 name=model_name,
1246 table=new_db_table_name,
1247 ),
1248 )
1249
1250 def generate_altered_db_table_comment(self) -> None:
1251 for package_label, model_name in sorted(self.kept_model_keys):
1252 old_model_name = self.renamed_models.get(
1253 (package_label, model_name), model_name
1254 )
1255 old_model_state = self.from_state.models[package_label, old_model_name]
1256 new_model_state = self.to_state.models[package_label, model_name]
1257
1258 old_db_table_comment = old_model_state.options.get("db_table_comment")
1259 new_db_table_comment = new_model_state.options.get("db_table_comment")
1260 if old_db_table_comment != new_db_table_comment:
1261 self.add_operation(
1262 package_label,
1263 operations.AlterModelTableComment(
1264 name=model_name,
1265 table_comment=new_db_table_comment,
1266 ),
1267 )
1268
1269 def generate_altered_options(self) -> None:
1270 """
1271 Work out if any non-schema-affecting options have changed and make an
1272 operation to represent them in state changes (in case Python code in
1273 migrations needs them).
1274 """
1275 for package_label, model_name in sorted(self.kept_model_keys):
1276 old_model_name = self.renamed_models.get(
1277 (package_label, model_name), model_name
1278 )
1279 old_model_state = self.from_state.models[package_label, old_model_name]
1280 new_model_state = self.to_state.models[package_label, model_name]
1281 old_options = {
1282 key: value
1283 for key, value in old_model_state.options.items()
1284 if key in AlterModelOptions.ALTER_OPTION_KEYS
1285 }
1286 new_options = {
1287 key: value
1288 for key, value in new_model_state.options.items()
1289 if key in AlterModelOptions.ALTER_OPTION_KEYS
1290 }
1291 if old_options != new_options:
1292 self.add_operation(
1293 package_label,
1294 operations.AlterModelOptions(
1295 name=model_name,
1296 options=new_options,
1297 ),
1298 )
1299
1300 def arrange_for_graph(
1301 self,
1302 changes: dict[str, list[Migration]],
1303 graph: MigrationGraph,
1304 migration_name: str | None = None,
1305 ) -> dict[str, list[Migration]]:
1306 """
1307 Take a result from changes() and a MigrationGraph, and fix the names
1308 and dependencies of the changes so they extend the graph from the leaf
1309 nodes for each app.
1310 """
1311 leaves = graph.leaf_nodes()
1312 name_map = {}
1313 for package_label, migrations in list(changes.items()):
1314 if not migrations:
1315 continue
1316 # Find the app label's current leaf node
1317 app_leaf = None
1318 for leaf in leaves:
1319 if leaf[0] == package_label:
1320 app_leaf = leaf
1321 break
1322 # Do they want an initial migration for this app?
1323 if app_leaf is None and not self.questioner.ask_initial(package_label):
1324 # They don't.
1325 for migration in migrations:
1326 name_map[(package_label, migration.name)] = (
1327 package_label,
1328 "__first__",
1329 )
1330 del changes[package_label]
1331 continue
1332 # Work out the next number in the sequence
1333 if app_leaf is None:
1334 next_number = 1
1335 else:
1336 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1337 # Name each migration
1338 for i, migration in enumerate(migrations):
1339 if i == 0 and app_leaf:
1340 migration.dependencies.append(app_leaf)
1341 new_name_parts = ["%04i" % next_number] # noqa: UP031
1342 if migration_name:
1343 new_name_parts.append(migration_name)
1344 elif i == 0 and not app_leaf:
1345 new_name_parts.append("initial")
1346 else:
1347 new_name_parts.append(migration.suggest_name()[:100])
1348 new_name = "_".join(new_name_parts)
1349 name_map[(package_label, migration.name)] = (package_label, new_name)
1350 next_number += 1
1351 migration.name = new_name
1352 # Now fix dependencies
1353 for migrations in changes.values():
1354 for migration in migrations:
1355 migration.dependencies = [
1356 name_map.get(d, d) for d in migration.dependencies
1357 ]
1358 return changes
1359
1360 def _trim_to_packages(
1361 self, changes: dict[str, list[Migration]], package_labels: set[str]
1362 ) -> dict[str, list[Migration]]:
1363 """
1364 Take changes from arrange_for_graph() and set of app labels, and return
1365 a modified set of changes which trims out as many migrations that are
1366 not in package_labels as possible. Note that some other migrations may
1367 still be present as they may be required dependencies.
1368 """
1369 # Gather other app dependencies in a first pass
1370 app_dependencies = {}
1371 for package_label, migrations in changes.items():
1372 for migration in migrations:
1373 for dep_package_label, name in migration.dependencies:
1374 app_dependencies.setdefault(package_label, set()).add(
1375 dep_package_label
1376 )
1377 required_packages = set(package_labels)
1378 # Keep resolving till there's no change
1379 old_required_packages = None
1380 while old_required_packages != required_packages:
1381 old_required_packages = set(required_packages)
1382 required_packages.update(
1383 *[
1384 app_dependencies.get(package_label, ())
1385 for package_label in required_packages
1386 ]
1387 )
1388 # Remove all migrations that aren't needed
1389 for package_label in list(changes):
1390 if package_label not in required_packages:
1391 del changes[package_label]
1392 return changes
1393
1394 @classmethod
1395 def parse_number(cls, name: str) -> int | None:
1396 """
1397 Given a migration name, try to extract a number from the beginning of
1398 it. For a squashed migration such as '0001_squashed_0004…', return the
1399 second number. If no number is found, return None.
1400 """
1401 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1402 return int(squashed_match[1])
1403 match = re.match(r"^\d+", name)
1404 if match:
1405 return int(match[0])
1406 return None