1from __future__ import annotations
2
3import functools
4import re
5from graphlib import TopologicalSorter
6from typing import TYPE_CHECKING, Any
7
8from plain import models
9from plain.models.migrations import operations
10from plain.models.migrations.migration import Migration, SettingsTuple
11from plain.models.migrations.operations.models import AlterModelOptions
12from plain.models.migrations.optimizer import MigrationOptimizer
13from plain.models.migrations.questioner import MigrationQuestioner
14from plain.models.migrations.utils import (
15 COMPILED_REGEX_TYPE,
16 RegexObject,
17 resolve_relation,
18)
19from plain.runtime import settings
20
21if TYPE_CHECKING:
22 from plain.models.fields import Field
23 from plain.models.migrations.graph import MigrationGraph
24 from plain.models.migrations.operations.base import Operation
25 from plain.models.migrations.state import ProjectState
26
27
28class MigrationAutodetector:
29 """
30 Take a pair of ProjectStates and compare them to see what the first would
31 need doing to make it match the second (the second usually being the
32 project's current state).
33
34 Note that this naturally operates on entire projects at a time,
35 as it's likely that changes interact (for example, you can't
36 add a ForeignKey without having a migration to add the table it
37 depends on first). A user interface may offer single-app usage
38 if it wishes, with the caveat that it may not always be possible.
39 """
40
41 def __init__(
42 self,
43 from_state: ProjectState,
44 to_state: ProjectState,
45 questioner: MigrationQuestioner | None = None,
46 ):
47 self.from_state = from_state
48 self.to_state = to_state
49 self.questioner = questioner or MigrationQuestioner()
50 self.existing_packages = {app for app, model in from_state.models}
51
52 def changes(
53 self,
54 graph: MigrationGraph,
55 trim_to_packages: set[str] | None = None,
56 convert_packages: set[str] | None = None,
57 migration_name: str | None = None,
58 ) -> dict[str, list[Migration]]:
59 """
60 Main entry point to produce a list of applicable changes.
61 Take a graph to base names on and an optional set of packages
62 to try and restrict to (restriction is not guaranteed)
63 """
64 changes = self._detect_changes(convert_packages, graph)
65 changes = self.arrange_for_graph(changes, graph, migration_name)
66 if trim_to_packages:
67 changes = self._trim_to_packages(changes, trim_to_packages)
68 return changes
69
70 def deep_deconstruct(self, obj: Any) -> Any:
71 """
72 Recursive deconstruction for a field and its arguments.
73 Used for full comparison for rename/alter; sometimes a single-level
74 deconstruction will not compare correctly.
75 """
76 if isinstance(obj, list):
77 return [self.deep_deconstruct(value) for value in obj]
78 elif isinstance(obj, tuple):
79 return tuple(self.deep_deconstruct(value) for value in obj)
80 elif isinstance(obj, dict):
81 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
82 elif isinstance(obj, functools.partial):
83 return (
84 obj.func,
85 self.deep_deconstruct(obj.args),
86 self.deep_deconstruct(obj.keywords),
87 )
88 elif isinstance(obj, COMPILED_REGEX_TYPE):
89 return RegexObject(obj)
90 elif isinstance(obj, type):
91 # If this is a type that implements 'deconstruct' as an instance method,
92 # avoid treating this as being deconstructible itself - see #22951
93 return obj
94 elif hasattr(obj, "deconstruct"):
95 deconstructed = obj.deconstruct()
96 if isinstance(obj, models.Field):
97 # we have a field which also returns a name
98 deconstructed = deconstructed[1:]
99 path, args, kwargs = deconstructed
100 return (
101 path,
102 [self.deep_deconstruct(value) for value in args],
103 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
104 )
105 else:
106 return obj
107
108 def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
109 """
110 Return a definition of the fields that ignores field names and
111 what related fields actually relate to. Used for detecting renames (as
112 the related fields change during renames).
113 """
114 fields_def = []
115 for name, field in sorted(fields.items()):
116 deconstruction = self.deep_deconstruct(field)
117 if field.remote_field and field.remote_field.model:
118 deconstruction[2].pop("to", None)
119 fields_def.append(deconstruction)
120 return fields_def
121
122 def _detect_changes(
123 self,
124 convert_packages: set[str] | None = None,
125 graph: MigrationGraph | None = None,
126 ) -> dict[str, list[Migration]]:
127 """
128 Return a dict of migration plans which will achieve the
129 change from from_state to to_state. The dict has app labels
130 as keys and a list of migrations as values.
131
132 The resulting migrations aren't specially named, but the names
133 do matter for dependencies inside the set.
134
135 convert_packages is the list of packages to convert to use migrations
136 (i.e. to make initial migrations for, in the usual case)
137
138 graph is an optional argument that, if provided, can help improve
139 dependency generation and avoid potential circular dependencies.
140 """
141 # The first phase is generating all the operations for each app
142 # and gathering them into a big per-app list.
143 # Then go through that list, order it, and split into migrations to
144 # resolve dependencies caused by M2Ms and FKs.
145 self.generated_operations = {}
146 self.altered_indexes = {}
147 self.altered_constraints = {}
148 self.renamed_fields = {}
149
150 # Prepare some old/new state and model lists, ignoring unmigrated packages.
151 self.old_model_keys = set()
152 self.new_model_keys = set()
153 for (package_label, model_name), model_state in self.from_state.models.items():
154 if package_label not in self.from_state.real_packages:
155 self.old_model_keys.add((package_label, model_name))
156
157 for (package_label, model_name), model_state in self.to_state.models.items():
158 if package_label not in self.from_state.real_packages or (
159 convert_packages and package_label in convert_packages
160 ):
161 self.new_model_keys.add((package_label, model_name))
162
163 self.from_state.resolve_fields_and_relations()
164 self.to_state.resolve_fields_and_relations()
165
166 # Renames have to come first
167 self.generate_renamed_models()
168
169 # Prepare lists of fields and generate through model map
170 self._prepare_field_lists()
171 self._generate_through_model_map()
172
173 # Generate non-rename model operations
174 self.generate_deleted_models()
175 self.generate_created_models()
176 self.generate_altered_options()
177 self.generate_altered_db_table_comment()
178
179 # Create the renamed fields and store them in self.renamed_fields.
180 # They are used by create_altered_indexes(), generate_altered_fields(),
181 # generate_removed_altered_index(), and
182 # generate_altered_index().
183 self.create_renamed_fields()
184 # Create the altered indexes and store them in self.altered_indexes.
185 # This avoids the same computation in generate_removed_indexes()
186 # and generate_added_indexes().
187 self.create_altered_indexes()
188 self.create_altered_constraints()
189 # Generate index removal operations before field is removed
190 self.generate_removed_constraints()
191 self.generate_removed_indexes()
192 # Generate field renaming operations.
193 self.generate_renamed_fields()
194 self.generate_renamed_indexes()
195 # Generate field operations.
196 self.generate_removed_fields()
197 self.generate_added_fields()
198 self.generate_altered_fields()
199 self.generate_added_indexes()
200 self.generate_added_constraints()
201 self.generate_altered_db_table()
202
203 self._sort_migrations()
204 self._build_migration_list(graph)
205 self._optimize_migrations()
206
207 return self.migrations
208
209 def _prepare_field_lists(self) -> None:
210 """
211 Prepare field lists and a list of the fields that used through models
212 in the old state so dependencies can be made from the through model
213 deletion to the field that uses it.
214 """
215 self.kept_model_keys = self.old_model_keys & self.new_model_keys
216 self.through_users = {}
217 self.old_field_keys = {
218 (package_label, model_name, field_name)
219 for package_label, model_name in self.kept_model_keys
220 for field_name in self.from_state.models[
221 package_label,
222 self.renamed_models.get((package_label, model_name), model_name),
223 ].fields
224 }
225 self.new_field_keys = {
226 (package_label, model_name, field_name)
227 for package_label, model_name in self.kept_model_keys
228 for field_name in self.to_state.models[package_label, model_name].fields
229 }
230
231 def _generate_through_model_map(self) -> None:
232 """Through model map generation."""
233 for package_label, model_name in sorted(self.old_model_keys):
234 old_model_name = self.renamed_models.get(
235 (package_label, model_name), model_name
236 )
237 old_model_state = self.from_state.models[package_label, old_model_name]
238 for field_name, field in old_model_state.fields.items():
239 if hasattr(field, "remote_field") and getattr(
240 field.remote_field, "through", None
241 ):
242 through_key = resolve_relation(
243 field.remote_field.through, package_label, model_name
244 )
245 self.through_users[through_key] = (
246 package_label,
247 old_model_name,
248 field_name,
249 )
250
251 @staticmethod
252 def _resolve_dependency(
253 dependency: tuple[str, ...],
254 ) -> tuple[tuple[str, ...], bool]:
255 """
256 Return the resolved dependency and a boolean denoting whether or not
257 it was a settings dependency.
258 """
259 if not isinstance(dependency, SettingsTuple):
260 return dependency, False
261 resolved_package_label, resolved_object_name = getattr(
262 settings, dependency[1]
263 ).split(".")
264 return (resolved_package_label, resolved_object_name.lower()) + dependency[
265 2:
266 ], True
267
268 def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
269 """
270 Chop the lists of operations up into migrations with dependencies on
271 each other. Do this by going through an app's list of operations until
272 one is found that has an outgoing dependency that isn't in another
273 app's migration yet (hasn't been chopped off its list). Then chop off
274 the operations before it into a migration and move onto the next app.
275 If the loops completes without doing anything, there's a circular
276 dependency (which _should_ be impossible as the operations are
277 all split at this point so they can't depend and be depended on).
278 """
279 self.migrations = {}
280 num_ops = sum(len(x) for x in self.generated_operations.values())
281 chop_mode = False
282 while num_ops:
283 # On every iteration, we step through all the packages and see if there
284 # is a completed set of operations.
285 # If we find that a subset of the operations are complete we can
286 # try to chop it off from the rest and continue, but we only
287 # do this if we've already been through the list once before
288 # without any chopping and nothing has changed.
289 for package_label in sorted(self.generated_operations):
290 chopped = []
291 dependencies = set()
292 for operation in list(self.generated_operations[package_label]):
293 deps_satisfied = True
294 operation_dependencies = set()
295 for dep in operation._auto_deps:
296 # Temporarily resolve the settings dependency to
297 # prevent circular references. While keeping the
298 # dependency checks on the resolved model, add the
299 # settings dependencies.
300 original_dep = dep
301 dep, is_settings_dep = self._resolve_dependency(dep)
302 if dep[0] != package_label:
303 # External app dependency. See if it's not yet
304 # satisfied.
305 for other_operation in self.generated_operations.get(
306 dep[0], []
307 ):
308 if self.check_dependency(other_operation, dep):
309 deps_satisfied = False
310 break
311 if not deps_satisfied:
312 break
313 else:
314 if is_settings_dep:
315 operation_dependencies.add(
316 (original_dep[0], original_dep[1])
317 )
318 elif dep[0] in self.migrations:
319 operation_dependencies.add(
320 (dep[0], self.migrations[dep[0]][-1].name)
321 )
322 else:
323 # If we can't find the other app, we add a
324 # first/last dependency, but only if we've
325 # already been through once and checked
326 # everything.
327 if chop_mode:
328 # If the app already exists, we add a
329 # dependency on the last migration, as
330 # we don't know which migration
331 # contains the target field. If it's
332 # not yet migrated or has no
333 # migrations, we use __first__.
334 if graph and graph.leaf_nodes(dep[0]):
335 operation_dependencies.add(
336 graph.leaf_nodes(dep[0])[0]
337 )
338 else:
339 operation_dependencies.add(
340 (dep[0], "__first__")
341 )
342 else:
343 deps_satisfied = False
344 if deps_satisfied:
345 chopped.append(operation)
346 dependencies.update(operation_dependencies)
347 del self.generated_operations[package_label][0]
348 else:
349 break
350 # Make a migration! Well, only if there's stuff to put in it
351 if dependencies or chopped:
352 if not self.generated_operations[package_label] or chop_mode:
353 subclass = type(
354 "Migration",
355 (Migration,),
356 {"operations": [], "dependencies": []},
357 )
358 instance = subclass(
359 "auto_%i" # noqa: UP031
360 % (len(self.migrations.get(package_label, [])) + 1),
361 package_label,
362 )
363 instance.dependencies = list(dependencies)
364 instance.operations = chopped
365 instance.initial = package_label not in self.existing_packages
366 self.migrations.setdefault(package_label, []).append(instance)
367 chop_mode = False
368 else:
369 self.generated_operations[package_label] = (
370 chopped + self.generated_operations[package_label]
371 )
372 new_num_ops = sum(len(x) for x in self.generated_operations.values())
373 if new_num_ops == num_ops:
374 if not chop_mode:
375 chop_mode = True
376 else:
377 raise ValueError(
378 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
379 )
380 num_ops = new_num_ops
381
382 def _sort_migrations(self) -> None:
383 """
384 Reorder to make things possible. Reordering may be needed so FKs work
385 nicely inside the same app.
386 """
387 for package_label, ops in sorted(self.generated_operations.items()):
388 ts = TopologicalSorter()
389 for op in ops:
390 ts.add(op)
391 for dep in op._auto_deps:
392 # Resolve intra-app dependencies to handle circular
393 # references involving a settings model.
394 dep = self._resolve_dependency(dep)[0]
395 if dep[0] != package_label:
396 continue
397 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
398 self.generated_operations[package_label] = list(ts.static_order())
399
400 def _optimize_migrations(self) -> None:
401 # Add in internal dependencies among the migrations
402 for package_label, migrations in self.migrations.items():
403 for m1, m2 in zip(migrations, migrations[1:]):
404 m2.dependencies.append((package_label, m1.name))
405
406 # De-dupe dependencies
407 for migrations in self.migrations.values():
408 for migration in migrations:
409 migration.dependencies = list(set(migration.dependencies))
410
411 # Optimize migrations
412 for package_label, migrations in self.migrations.items():
413 for migration in migrations:
414 migration.operations = MigrationOptimizer().optimize(
415 migration.operations, package_label
416 )
417
418 def check_dependency(
419 self, operation: Operation, dependency: tuple[str, ...]
420 ) -> bool:
421 """
422 Return True if the given operation depends on the given dependency,
423 False otherwise.
424 """
425 # Created model
426 if dependency[2] is None and dependency[3] is True:
427 return (
428 isinstance(operation, operations.CreateModel)
429 and operation.name_lower == dependency[1].lower()
430 )
431 # Created field
432 elif dependency[2] is not None and dependency[3] is True:
433 return (
434 isinstance(operation, operations.CreateModel)
435 and operation.name_lower == dependency[1].lower()
436 and any(dependency[2] == x for x, y in operation.fields)
437 ) or (
438 isinstance(operation, operations.AddField)
439 and operation.model_name_lower == dependency[1].lower()
440 and operation.name_lower == dependency[2].lower()
441 )
442 # Removed field
443 elif dependency[2] is not None and dependency[3] is False:
444 return (
445 isinstance(operation, operations.RemoveField)
446 and operation.model_name_lower == dependency[1].lower()
447 and operation.name_lower == dependency[2].lower()
448 )
449 # Removed model
450 elif dependency[2] is None and dependency[3] is False:
451 return (
452 isinstance(operation, operations.DeleteModel)
453 and operation.name_lower == dependency[1].lower()
454 )
455 # Field being altered
456 elif dependency[2] is not None and dependency[3] == "alter":
457 return (
458 isinstance(operation, operations.AlterField)
459 and operation.model_name_lower == dependency[1].lower()
460 and operation.name_lower == dependency[2].lower()
461 )
462 # Unknown dependency. Raise an error.
463 else:
464 raise ValueError(f"Can't handle dependency {dependency!r}")
465
466 def add_operation(
467 self,
468 package_label: str,
469 operation: Operation,
470 dependencies: list[tuple[str, ...]] | None = None,
471 beginning: bool = False,
472 ) -> None:
473 # Dependencies are
474 # (package_label, model_name, field_name, create/delete as True/False)
475 operation._auto_deps = dependencies or [] # type: ignore[attr-defined]
476 if beginning:
477 self.generated_operations.setdefault(package_label, []).insert(0, operation)
478 else:
479 self.generated_operations.setdefault(package_label, []).append(operation)
480
481 def generate_renamed_models(self) -> None:
482 """
483 Find any renamed models, generate the operations for them, and remove
484 the old entry from the model lists. Must be run before other
485 model-level generation.
486 """
487 self.renamed_models = {}
488 self.renamed_models_rel = {}
489 added_models = self.new_model_keys - self.old_model_keys
490 for package_label, model_name in sorted(added_models):
491 model_state = self.to_state.models[package_label, model_name]
492 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
493
494 removed_models = self.old_model_keys - self.new_model_keys
495 for rem_package_label, rem_model_name in removed_models:
496 if rem_package_label == package_label:
497 rem_model_state = self.from_state.models[
498 rem_package_label, rem_model_name
499 ]
500 rem_model_fields_def = self.only_relation_agnostic_fields(
501 rem_model_state.fields
502 )
503 if model_fields_def == rem_model_fields_def:
504 if self.questioner.ask_rename_model(
505 rem_model_state, model_state
506 ):
507 dependencies = []
508 fields = list(model_state.fields.values()) + [
509 field.remote_field
510 for relations in self.to_state.relations[
511 package_label, model_name
512 ].values()
513 for field in relations.values()
514 ]
515 for field in fields:
516 if field.is_relation:
517 dependencies.extend(
518 self._get_dependencies_for_foreign_key(
519 package_label,
520 model_name,
521 field,
522 self.to_state,
523 )
524 )
525 self.add_operation(
526 package_label,
527 operations.RenameModel(
528 old_name=rem_model_state.name,
529 new_name=model_state.name,
530 ),
531 dependencies=dependencies,
532 )
533 self.renamed_models[package_label, model_name] = (
534 rem_model_name
535 )
536 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
537 self.renamed_models_rel[renamed_models_rel_key] = (
538 f"{model_state.package_label}.{model_state.name_lower}"
539 )
540 self.old_model_keys.remove(
541 (rem_package_label, rem_model_name)
542 )
543 self.old_model_keys.add((package_label, model_name))
544 break
545
546 def generate_created_models(self) -> None:
547 """
548 Find all new models and make create
549 operations for them as well as separate operations to create any
550 foreign key or M2M relationships (these are optimized later, if
551 possible).
552
553 Defer any model options that refer to collections of fields that might
554 be deferred.
555 """
556 added_models = self.new_model_keys - self.old_model_keys
557
558 for package_label, model_name in added_models:
559 model_state = self.to_state.models[package_label, model_name]
560 # Gather related fields
561 related_fields = {}
562 primary_key_rel = None
563 for field_name, field in model_state.fields.items():
564 if field.remote_field:
565 if field.remote_field.model:
566 if field.primary_key:
567 primary_key_rel = field.remote_field.model
568 else:
569 related_fields[field_name] = field
570 if getattr(field.remote_field, "through", None):
571 related_fields[field_name] = field
572
573 # Are there indexes to defer?
574 indexes = model_state.options.pop("indexes")
575 constraints = model_state.options.pop("constraints")
576 # Depend on the deletion of any possible proxy version of us
577 dependencies = [
578 (package_label, model_name, None, False),
579 ]
580 # Depend on all bases
581 for base in model_state.bases:
582 if isinstance(base, str) and "." in base:
583 base_package_label, base_name = base.split(".", 1)
584 dependencies.append((base_package_label, base_name, None, True))
585 # Depend on the removal of base fields if the new model has
586 # a field with the same name.
587 old_base_model_state = self.from_state.models.get(
588 (base_package_label, base_name)
589 )
590 new_base_model_state = self.to_state.models.get(
591 (base_package_label, base_name)
592 )
593 if old_base_model_state and new_base_model_state:
594 removed_base_fields = (
595 set(old_base_model_state.fields)
596 .difference(
597 new_base_model_state.fields,
598 )
599 .intersection(model_state.fields)
600 )
601 for removed_base_field in removed_base_fields:
602 dependencies.append(
603 (
604 base_package_label,
605 base_name,
606 removed_base_field,
607 False,
608 )
609 )
610 # Depend on the other end of the primary key if it's a relation
611 if primary_key_rel:
612 dependencies.append(
613 resolve_relation(
614 primary_key_rel,
615 package_label,
616 model_name,
617 )
618 + (None, True)
619 )
620 # Generate creation operation
621 self.add_operation(
622 package_label,
623 operations.CreateModel(
624 name=model_state.name,
625 fields=[
626 d
627 for d in model_state.fields.items()
628 if d[0] not in related_fields
629 ],
630 options=model_state.options,
631 bases=model_state.bases,
632 ),
633 dependencies=dependencies,
634 beginning=True,
635 )
636
637 # Generate operations for each related field
638 for name, field in sorted(related_fields.items()):
639 dependencies = self._get_dependencies_for_foreign_key(
640 package_label,
641 model_name,
642 field,
643 self.to_state,
644 )
645 # Depend on our own model being created
646 dependencies.append((package_label, model_name, None, True))
647 # Make operation
648 self.add_operation(
649 package_label,
650 operations.AddField(
651 model_name=model_name,
652 name=name,
653 field=field,
654 ),
655 dependencies=list(set(dependencies)),
656 )
657
658 related_dependencies = [
659 (package_label, model_name, name, True)
660 for name in sorted(related_fields)
661 ]
662 related_dependencies.append((package_label, model_name, None, True))
663 for index in indexes:
664 self.add_operation(
665 package_label,
666 operations.AddIndex(
667 model_name=model_name,
668 index=index,
669 ),
670 dependencies=related_dependencies,
671 )
672 for constraint in constraints:
673 self.add_operation(
674 package_label,
675 operations.AddConstraint(
676 model_name=model_name,
677 constraint=constraint,
678 ),
679 dependencies=related_dependencies,
680 )
681
682 def generate_deleted_models(self) -> None:
683 """
684 Find all deleted models and make delete
685 operations for them as well as separate operations to delete any
686 foreign key or M2M relationships (these are optimized later, if
687 possible).
688
689 Also bring forward removal of any model options that refer to
690 collections of fields - the inverse of generate_created_models().
691 """
692 deleted_models = self.old_model_keys - self.new_model_keys
693
694 for package_label, model_name in sorted(deleted_models):
695 model_state = self.from_state.models[package_label, model_name]
696 # Gather related fields
697 related_fields = {}
698 for field_name, field in model_state.fields.items():
699 if field.remote_field:
700 if field.remote_field.model:
701 related_fields[field_name] = field
702 if getattr(field.remote_field, "through", None):
703 related_fields[field_name] = field
704
705 # Then remove each related field
706 for name in sorted(related_fields):
707 self.add_operation(
708 package_label,
709 operations.RemoveField(
710 model_name=model_name,
711 name=name,
712 ),
713 )
714 # Finally, remove the model.
715 # This depends on both the removal/alteration of all incoming fields
716 # and the removal of all its own related fields, and if it's
717 # a through model the field that references it.
718 dependencies = []
719 relations = self.from_state.relations
720 for (
721 related_object_package_label,
722 object_name,
723 ), relation_related_fields in relations[package_label, model_name].items():
724 for field_name, field in relation_related_fields.items():
725 dependencies.append(
726 (related_object_package_label, object_name, field_name, False),
727 )
728 if not field.many_to_many:
729 dependencies.append(
730 (
731 related_object_package_label,
732 object_name,
733 field_name,
734 "alter",
735 ),
736 )
737
738 for name in sorted(related_fields):
739 dependencies.append((package_label, model_name, name, False))
740 # We're referenced in another field's through=
741 through_user = self.through_users.get(
742 (package_label, model_state.name_lower)
743 )
744 if through_user:
745 dependencies.append(
746 (through_user[0], through_user[1], through_user[2], False)
747 )
748 # Finally, make the operation, deduping any dependencies
749 self.add_operation(
750 package_label,
751 operations.DeleteModel(
752 name=model_state.name,
753 ),
754 dependencies=list(set(dependencies)),
755 )
756
757 def create_renamed_fields(self) -> None:
758 """Work out renamed fields."""
759 self.renamed_operations = []
760 old_field_keys = self.old_field_keys.copy()
761 for package_label, model_name, field_name in sorted(
762 self.new_field_keys - old_field_keys
763 ):
764 old_model_name = self.renamed_models.get(
765 (package_label, model_name), model_name
766 )
767 old_model_state = self.from_state.models[package_label, old_model_name]
768 new_model_state = self.to_state.models[package_label, model_name]
769 field = new_model_state.get_field(field_name)
770 # Scan to see if this is actually a rename!
771 field_dec = self.deep_deconstruct(field)
772 for rem_package_label, rem_model_name, rem_field_name in sorted(
773 old_field_keys - self.new_field_keys
774 ):
775 if rem_package_label == package_label and rem_model_name == model_name:
776 old_field = old_model_state.get_field(rem_field_name)
777 old_field_dec = self.deep_deconstruct(old_field)
778 if (
779 field.remote_field
780 and field.remote_field.model
781 and "to" in old_field_dec[2]
782 ):
783 old_rel_to = old_field_dec[2]["to"]
784 if old_rel_to in self.renamed_models_rel:
785 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
786 old_field.set_attributes_from_name(rem_field_name)
787 old_db_column = old_field.get_attname_column()[1]
788 if old_field_dec == field_dec or (
789 # Was the field renamed and db_column equal to the
790 # old field's column added?
791 old_field_dec[0:2] == field_dec[0:2]
792 and dict(old_field_dec[2], db_column=old_db_column)
793 == field_dec[2]
794 ):
795 if self.questioner.ask_rename(
796 model_name, rem_field_name, field_name, field
797 ):
798 self.renamed_operations.append(
799 (
800 rem_package_label,
801 rem_model_name,
802 old_field.db_column,
803 rem_field_name,
804 package_label,
805 model_name,
806 field,
807 field_name,
808 )
809 )
810 old_field_keys.remove(
811 (rem_package_label, rem_model_name, rem_field_name)
812 )
813 old_field_keys.add((package_label, model_name, field_name))
814 self.renamed_fields[
815 package_label, model_name, field_name
816 ] = rem_field_name
817 break
818
819 def generate_renamed_fields(self) -> None:
820 """Generate RenameField operations."""
821 for (
822 rem_package_label,
823 rem_model_name,
824 rem_db_column,
825 rem_field_name,
826 package_label,
827 model_name,
828 field,
829 field_name,
830 ) in self.renamed_operations:
831 # A db_column mismatch requires a prior noop AlterField for the
832 # subsequent RenameField to be a noop on attempts at preserving the
833 # old name.
834 if rem_db_column != field.db_column:
835 altered_field = field.clone()
836 altered_field.name = rem_field_name
837 self.add_operation(
838 package_label,
839 operations.AlterField(
840 model_name=model_name,
841 name=rem_field_name,
842 field=altered_field,
843 ),
844 )
845 self.add_operation(
846 package_label,
847 operations.RenameField(
848 model_name=model_name,
849 old_name=rem_field_name,
850 new_name=field_name,
851 ),
852 )
853 self.old_field_keys.remove(
854 (rem_package_label, rem_model_name, rem_field_name)
855 )
856 self.old_field_keys.add((package_label, model_name, field_name))
857
858 def generate_added_fields(self) -> None:
859 """Make AddField operations."""
860 for package_label, model_name, field_name in sorted(
861 self.new_field_keys - self.old_field_keys
862 ):
863 self._generate_added_field(package_label, model_name, field_name)
864
865 def _generate_added_field(
866 self, package_label: str, model_name: str, field_name: str
867 ) -> None:
868 field = self.to_state.models[package_label, model_name].get_field(field_name)
869 # Adding a field always depends at least on its removal.
870 dependencies = [(package_label, model_name, field_name, False)]
871 # Fields that are foreignkeys/m2ms depend on stuff.
872 if field.remote_field and field.remote_field.model:
873 dependencies.extend(
874 self._get_dependencies_for_foreign_key(
875 package_label,
876 model_name,
877 field,
878 self.to_state,
879 )
880 )
881 # You can't just add NOT NULL fields with no default or fields
882 # which don't allow empty strings as default.
883 time_fields = (models.DateField, models.DateTimeField, models.TimeField)
884 preserve_default = (
885 field.allow_null
886 or field.has_default()
887 or field.many_to_many
888 or (not field.required and field.empty_strings_allowed)
889 or (isinstance(field, time_fields) and field.auto_now)
890 )
891 if not preserve_default:
892 field = field.clone()
893 if isinstance(field, time_fields) and field.auto_now_add:
894 field.default = self.questioner.ask_auto_now_add_addition(
895 field_name, model_name
896 )
897 else:
898 field.default = self.questioner.ask_not_null_addition(
899 field_name, model_name
900 )
901 if (
902 field.primary_key
903 and field.default is not models.NOT_PROVIDED
904 and callable(field.default)
905 ):
906 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
907 self.add_operation(
908 package_label,
909 operations.AddField(
910 model_name=model_name,
911 name=field_name,
912 field=field,
913 preserve_default=preserve_default,
914 ),
915 dependencies=dependencies,
916 )
917
918 def generate_removed_fields(self) -> None:
919 """Make RemoveField operations."""
920 for package_label, model_name, field_name in sorted(
921 self.old_field_keys - self.new_field_keys
922 ):
923 self._generate_removed_field(package_label, model_name, field_name)
924
925 def _generate_removed_field(
926 self, package_label: str, model_name: str, field_name: str
927 ) -> None:
928 self.add_operation(
929 package_label,
930 operations.RemoveField(
931 model_name=model_name,
932 name=field_name,
933 ),
934 )
935
936 def generate_altered_fields(self) -> None:
937 """
938 Make AlterField operations, or possibly RemovedField/AddField if alter
939 isn't possible.
940 """
941 for package_label, model_name, field_name in sorted(
942 self.old_field_keys & self.new_field_keys
943 ):
944 # Did the field change?
945 old_model_name = self.renamed_models.get(
946 (package_label, model_name), model_name
947 )
948 old_field_name = self.renamed_fields.get(
949 (package_label, model_name, field_name), field_name
950 )
951 old_field = self.from_state.models[package_label, old_model_name].get_field(
952 old_field_name
953 )
954 new_field = self.to_state.models[package_label, model_name].get_field(
955 field_name
956 )
957 dependencies = []
958 # Implement any model renames on relations; these are handled by RenameModel
959 # so we need to exclude them from the comparison
960 if hasattr(new_field, "remote_field") and getattr(
961 new_field.remote_field, "model", None
962 ):
963 rename_key = resolve_relation(
964 new_field.remote_field.model, package_label, model_name
965 )
966 if rename_key in self.renamed_models:
967 new_field.remote_field.model = old_field.remote_field.model
968 # Handle ForeignKey which can only have a single to_field.
969 remote_field_name = getattr(new_field.remote_field, "field_name", None)
970 if remote_field_name:
971 to_field_rename_key = rename_key + (remote_field_name,)
972 if to_field_rename_key in self.renamed_fields:
973 # Repoint model name only
974 new_field.remote_field.model = old_field.remote_field.model
975 dependencies.extend(
976 self._get_dependencies_for_foreign_key(
977 package_label,
978 model_name,
979 new_field,
980 self.to_state,
981 )
982 )
983 if hasattr(new_field, "remote_field") and getattr(
984 new_field.remote_field, "through", None
985 ):
986 rename_key = resolve_relation(
987 new_field.remote_field.through, package_label, model_name
988 )
989 if rename_key in self.renamed_models:
990 new_field.remote_field.through = old_field.remote_field.through
991 old_field_dec = self.deep_deconstruct(old_field)
992 new_field_dec = self.deep_deconstruct(new_field)
993 # If the field was confirmed to be renamed it means that only
994 # db_column was allowed to change which generate_renamed_fields()
995 # already accounts for by adding an AlterField operation.
996 if old_field_dec != new_field_dec and old_field_name == field_name:
997 both_m2m = old_field.many_to_many and new_field.many_to_many
998 neither_m2m = not old_field.many_to_many and not new_field.many_to_many
999 if both_m2m or neither_m2m:
1000 # Either both fields are m2m or neither is
1001 preserve_default = True
1002 if (
1003 old_field.allow_null
1004 and not new_field.allow_null
1005 and not new_field.has_default()
1006 and not new_field.many_to_many
1007 ):
1008 field = new_field.clone()
1009 new_default = self.questioner.ask_not_null_alteration(
1010 field_name, model_name
1011 )
1012 if new_default is not models.NOT_PROVIDED:
1013 field.default = new_default
1014 preserve_default = False
1015 else:
1016 field = new_field
1017 self.add_operation(
1018 package_label,
1019 operations.AlterField(
1020 model_name=model_name,
1021 name=field_name,
1022 field=field,
1023 preserve_default=preserve_default,
1024 ),
1025 dependencies=dependencies,
1026 )
1027 else:
1028 # We cannot alter between m2m and concrete fields
1029 self._generate_removed_field(package_label, model_name, field_name)
1030 self._generate_added_field(package_label, model_name, field_name)
1031
1032 def create_altered_indexes(self) -> None:
1033 option_name = operations.AddIndex.option_name
1034
1035 for package_label, model_name in sorted(self.kept_model_keys):
1036 old_model_name = self.renamed_models.get(
1037 (package_label, model_name), model_name
1038 )
1039 old_model_state = self.from_state.models[package_label, old_model_name]
1040 new_model_state = self.to_state.models[package_label, model_name]
1041
1042 old_indexes = old_model_state.options[option_name]
1043 new_indexes = new_model_state.options[option_name]
1044 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1045 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1046 renamed_indexes = []
1047 # Find renamed indexes.
1048 remove_from_added = []
1049 remove_from_removed = []
1050 for new_index in added_indexes:
1051 new_index_dec = new_index.deconstruct()
1052 new_index_name = new_index_dec[2].pop("name")
1053 for old_index in removed_indexes:
1054 old_index_dec = old_index.deconstruct()
1055 old_index_name = old_index_dec[2].pop("name")
1056 # Indexes are the same except for the names.
1057 if (
1058 new_index_dec == old_index_dec
1059 and new_index_name != old_index_name
1060 ):
1061 renamed_indexes.append((old_index_name, new_index_name, None))
1062 remove_from_added.append(new_index)
1063 remove_from_removed.append(old_index)
1064
1065 # Remove renamed indexes from the lists of added and removed
1066 # indexes.
1067 added_indexes = [
1068 idx for idx in added_indexes if idx not in remove_from_added
1069 ]
1070 removed_indexes = [
1071 idx for idx in removed_indexes if idx not in remove_from_removed
1072 ]
1073
1074 self.altered_indexes.update(
1075 {
1076 (package_label, model_name): {
1077 "added_indexes": added_indexes,
1078 "removed_indexes": removed_indexes,
1079 "renamed_indexes": renamed_indexes,
1080 }
1081 }
1082 )
1083
1084 def generate_added_indexes(self) -> None:
1085 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1086 dependencies = self._get_dependencies_for_model(package_label, model_name)
1087 for index in alt_indexes["added_indexes"]:
1088 self.add_operation(
1089 package_label,
1090 operations.AddIndex(
1091 model_name=model_name,
1092 index=index,
1093 ),
1094 dependencies=dependencies,
1095 )
1096
1097 def generate_removed_indexes(self) -> None:
1098 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1099 for index in alt_indexes["removed_indexes"]:
1100 self.add_operation(
1101 package_label,
1102 operations.RemoveIndex(
1103 model_name=model_name,
1104 name=index.name,
1105 ),
1106 )
1107
1108 def generate_renamed_indexes(self) -> None:
1109 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1110 for old_index_name, new_index_name, old_fields in alt_indexes[
1111 "renamed_indexes"
1112 ]:
1113 self.add_operation(
1114 package_label,
1115 operations.RenameIndex(
1116 model_name=model_name,
1117 new_name=new_index_name,
1118 old_name=old_index_name,
1119 old_fields=old_fields,
1120 ),
1121 )
1122
1123 def create_altered_constraints(self) -> None:
1124 option_name = operations.AddConstraint.option_name
1125 for package_label, model_name in sorted(self.kept_model_keys):
1126 old_model_name = self.renamed_models.get(
1127 (package_label, model_name), model_name
1128 )
1129 old_model_state = self.from_state.models[package_label, old_model_name]
1130 new_model_state = self.to_state.models[package_label, model_name]
1131
1132 old_constraints = old_model_state.options[option_name]
1133 new_constraints = new_model_state.options[option_name]
1134 add_constraints = [c for c in new_constraints if c not in old_constraints]
1135 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1136
1137 self.altered_constraints.update(
1138 {
1139 (package_label, model_name): {
1140 "added_constraints": add_constraints,
1141 "removed_constraints": rem_constraints,
1142 }
1143 }
1144 )
1145
1146 def generate_added_constraints(self) -> None:
1147 for (
1148 package_label,
1149 model_name,
1150 ), alt_constraints in self.altered_constraints.items():
1151 dependencies = self._get_dependencies_for_model(package_label, model_name)
1152 for constraint in alt_constraints["added_constraints"]:
1153 self.add_operation(
1154 package_label,
1155 operations.AddConstraint(
1156 model_name=model_name,
1157 constraint=constraint,
1158 ),
1159 dependencies=dependencies,
1160 )
1161
1162 def generate_removed_constraints(self) -> None:
1163 for (
1164 package_label,
1165 model_name,
1166 ), alt_constraints in self.altered_constraints.items():
1167 for constraint in alt_constraints["removed_constraints"]:
1168 self.add_operation(
1169 package_label,
1170 operations.RemoveConstraint(
1171 model_name=model_name,
1172 name=constraint.name,
1173 ),
1174 )
1175
1176 @staticmethod
1177 def _get_dependencies_for_foreign_key(
1178 package_label: str, model_name: str, field: Field, project_state: ProjectState
1179 ) -> list[tuple[str, str, None, bool]]:
1180 remote_field_model = None
1181 if hasattr(field.remote_field, "model"):
1182 remote_field_model = field.remote_field.model
1183 else:
1184 relations = project_state.relations[package_label, model_name]
1185 for (remote_package_label, remote_model_name), fields in relations.items():
1186 if any(
1187 field == related_field.remote_field
1188 for related_field in fields.values()
1189 ):
1190 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1191 break
1192 dep_package_label, dep_object_name = resolve_relation(
1193 remote_field_model,
1194 package_label,
1195 model_name,
1196 )
1197 dependencies = [(dep_package_label, dep_object_name, None, True)]
1198 if getattr(field.remote_field, "through", None):
1199 through_package_label, through_object_name = resolve_relation(
1200 field.remote_field.through, # type: ignore[attr-defined]
1201 package_label,
1202 model_name,
1203 )
1204 dependencies.append(
1205 (through_package_label, through_object_name, None, True)
1206 )
1207 return dependencies
1208
1209 def _get_dependencies_for_model(
1210 self, package_label: str, model_name: str
1211 ) -> list[tuple[str, str, None, bool]]:
1212 """Return foreign key dependencies of the given model."""
1213 dependencies = []
1214 model_state = self.to_state.models[package_label, model_name]
1215 for field in model_state.fields.values():
1216 if field.is_relation:
1217 dependencies.extend(
1218 self._get_dependencies_for_foreign_key(
1219 package_label,
1220 model_name,
1221 field,
1222 self.to_state,
1223 )
1224 )
1225 return dependencies
1226
1227 def generate_altered_db_table(self) -> None:
1228 for package_label, model_name in sorted(self.kept_model_keys):
1229 old_model_name = self.renamed_models.get(
1230 (package_label, model_name), model_name
1231 )
1232 old_model_state = self.from_state.models[package_label, old_model_name]
1233 new_model_state = self.to_state.models[package_label, model_name]
1234 old_db_table_name = old_model_state.options.get("db_table")
1235 new_db_table_name = new_model_state.options.get("db_table")
1236 if old_db_table_name != new_db_table_name:
1237 self.add_operation(
1238 package_label,
1239 operations.AlterModelTable(
1240 name=model_name,
1241 table=new_db_table_name,
1242 ),
1243 )
1244
1245 def generate_altered_db_table_comment(self) -> None:
1246 for package_label, model_name in sorted(self.kept_model_keys):
1247 old_model_name = self.renamed_models.get(
1248 (package_label, model_name), model_name
1249 )
1250 old_model_state = self.from_state.models[package_label, old_model_name]
1251 new_model_state = self.to_state.models[package_label, model_name]
1252
1253 old_db_table_comment = old_model_state.options.get("db_table_comment")
1254 new_db_table_comment = new_model_state.options.get("db_table_comment")
1255 if old_db_table_comment != new_db_table_comment:
1256 self.add_operation(
1257 package_label,
1258 operations.AlterModelTableComment(
1259 name=model_name,
1260 table_comment=new_db_table_comment,
1261 ),
1262 )
1263
1264 def generate_altered_options(self) -> None:
1265 """
1266 Work out if any non-schema-affecting options have changed and make an
1267 operation to represent them in state changes (in case Python code in
1268 migrations needs them).
1269 """
1270 for package_label, model_name in sorted(self.kept_model_keys):
1271 old_model_name = self.renamed_models.get(
1272 (package_label, model_name), model_name
1273 )
1274 old_model_state = self.from_state.models[package_label, old_model_name]
1275 new_model_state = self.to_state.models[package_label, model_name]
1276 old_options = {
1277 key: value
1278 for key, value in old_model_state.options.items()
1279 if key in AlterModelOptions.ALTER_OPTION_KEYS
1280 }
1281 new_options = {
1282 key: value
1283 for key, value in new_model_state.options.items()
1284 if key in AlterModelOptions.ALTER_OPTION_KEYS
1285 }
1286 if old_options != new_options:
1287 self.add_operation(
1288 package_label,
1289 operations.AlterModelOptions(
1290 name=model_name,
1291 options=new_options,
1292 ),
1293 )
1294
1295 def arrange_for_graph(
1296 self,
1297 changes: dict[str, list[Migration]],
1298 graph: MigrationGraph,
1299 migration_name: str | None = None,
1300 ) -> dict[str, list[Migration]]:
1301 """
1302 Take a result from changes() and a MigrationGraph, and fix the names
1303 and dependencies of the changes so they extend the graph from the leaf
1304 nodes for each app.
1305 """
1306 leaves = graph.leaf_nodes()
1307 name_map = {}
1308 for package_label, migrations in list(changes.items()):
1309 if not migrations:
1310 continue
1311 # Find the app label's current leaf node
1312 app_leaf = None
1313 for leaf in leaves:
1314 if leaf[0] == package_label:
1315 app_leaf = leaf
1316 break
1317 # Do they want an initial migration for this app?
1318 if app_leaf is None and not self.questioner.ask_initial(package_label):
1319 # They don't.
1320 for migration in migrations:
1321 name_map[(package_label, migration.name)] = (
1322 package_label,
1323 "__first__",
1324 )
1325 del changes[package_label]
1326 continue
1327 # Work out the next number in the sequence
1328 if app_leaf is None:
1329 next_number = 1
1330 else:
1331 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1332 # Name each migration
1333 for i, migration in enumerate(migrations):
1334 if i == 0 and app_leaf:
1335 migration.dependencies.append(app_leaf)
1336 new_name_parts = ["%04i" % next_number] # noqa: UP031
1337 if migration_name:
1338 new_name_parts.append(migration_name)
1339 elif i == 0 and not app_leaf:
1340 new_name_parts.append("initial")
1341 else:
1342 new_name_parts.append(migration.suggest_name()[:100])
1343 new_name = "_".join(new_name_parts)
1344 name_map[(package_label, migration.name)] = (package_label, new_name)
1345 next_number += 1
1346 migration.name = new_name
1347 # Now fix dependencies
1348 for migrations in changes.values():
1349 for migration in migrations:
1350 migration.dependencies = [
1351 name_map.get(d, d) for d in migration.dependencies
1352 ]
1353 return changes
1354
1355 def _trim_to_packages(
1356 self, changes: dict[str, list[Migration]], package_labels: set[str]
1357 ) -> dict[str, list[Migration]]:
1358 """
1359 Take changes from arrange_for_graph() and set of app labels, and return
1360 a modified set of changes which trims out as many migrations that are
1361 not in package_labels as possible. Note that some other migrations may
1362 still be present as they may be required dependencies.
1363 """
1364 # Gather other app dependencies in a first pass
1365 app_dependencies = {}
1366 for package_label, migrations in changes.items():
1367 for migration in migrations:
1368 for dep_package_label, name in migration.dependencies:
1369 app_dependencies.setdefault(package_label, set()).add(
1370 dep_package_label
1371 )
1372 required_packages = set(package_labels)
1373 # Keep resolving till there's no change
1374 old_required_packages = None
1375 while old_required_packages != required_packages:
1376 old_required_packages = set(required_packages)
1377 required_packages.update(
1378 *[
1379 app_dependencies.get(package_label, ())
1380 for package_label in required_packages
1381 ]
1382 )
1383 # Remove all migrations that aren't needed
1384 for package_label in list(changes):
1385 if package_label not in required_packages:
1386 del changes[package_label]
1387 return changes
1388
1389 @classmethod
1390 def parse_number(cls, name: str) -> int | None:
1391 """
1392 Given a migration name, try to extract a number from the beginning of
1393 it. For a squashed migration such as '0001_squashed_0004…', return the
1394 second number. If no number is found, return None.
1395 """
1396 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1397 return int(squashed_match[1])
1398 match = re.match(r"^\d+", name)
1399 if match:
1400 return int(match[0])
1401 return None