1from __future__ import annotations
2
3import functools
4import re
5from graphlib import TopologicalSorter
6from typing import TYPE_CHECKING, Any
7
8from plain.models.fields import (
9 NOT_PROVIDED,
10 DateField,
11 DateTimeField,
12 Field,
13 TimeField,
14)
15from plain.models.fields.related import ManyToManyField, RelatedField
16from plain.models.fields.reverse_related import ManyToManyRel
17from plain.models.migrations import operations
18from plain.models.migrations.migration import Migration, SettingsTuple
19from plain.models.migrations.operations.models import AlterModelOptions
20from plain.models.migrations.optimizer import MigrationOptimizer
21from plain.models.migrations.questioner import MigrationQuestioner
22from plain.models.migrations.utils import (
23 COMPILED_REGEX_TYPE,
24 RegexObject,
25 resolve_relation,
26)
27from plain.runtime import settings
28
29if TYPE_CHECKING:
30 from plain.models.migrations.graph import MigrationGraph
31 from plain.models.migrations.operations.base import Operation
32 from plain.models.migrations.state import ProjectState
33
34
35class MigrationAutodetector:
36 """
37 Take a pair of ProjectStates and compare them to see what the first would
38 need doing to make it match the second (the second usually being the
39 project's current state).
40
41 Note that this naturally operates on entire projects at a time,
42 as it's likely that changes interact (for example, you can't
43 add a ForeignKeyField without having a migration to add the table it
44 depends on first). A user interface may offer single-app usage
45 if it wishes, with the caveat that it may not always be possible.
46 """
47
48 def __init__(
49 self,
50 from_state: ProjectState,
51 to_state: ProjectState,
52 questioner: MigrationQuestioner | None = None,
53 ):
54 self.from_state = from_state
55 self.to_state = to_state
56 self.questioner = questioner or MigrationQuestioner()
57 self.existing_packages = {app for app, model in from_state.models}
58
59 def changes(
60 self,
61 graph: MigrationGraph,
62 trim_to_packages: set[str] | None = None,
63 convert_packages: set[str] | None = None,
64 migration_name: str | None = None,
65 ) -> dict[str, list[Migration]]:
66 """
67 Main entry point to produce a list of applicable changes.
68 Take a graph to base names on and an optional set of packages
69 to try and restrict to (restriction is not guaranteed)
70 """
71 changes = self._detect_changes(convert_packages, graph)
72 changes = self.arrange_for_graph(changes, graph, migration_name)
73 if trim_to_packages:
74 changes = self._trim_to_packages(changes, trim_to_packages)
75 return changes
76
77 def deep_deconstruct(self, obj: Any) -> Any:
78 """
79 Recursive deconstruction for a field and its arguments.
80 Used for full comparison for rename/alter; sometimes a single-level
81 deconstruction will not compare correctly.
82 """
83 if isinstance(obj, list):
84 return [self.deep_deconstruct(value) for value in obj]
85 elif isinstance(obj, tuple):
86 return tuple(self.deep_deconstruct(value) for value in obj)
87 elif isinstance(obj, dict):
88 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
89 elif isinstance(obj, functools.partial):
90 return (
91 obj.func,
92 self.deep_deconstruct(obj.args),
93 self.deep_deconstruct(obj.keywords),
94 )
95 elif isinstance(obj, COMPILED_REGEX_TYPE):
96 return RegexObject(obj)
97 elif isinstance(obj, type):
98 # If this is a type that implements 'deconstruct' as an instance method,
99 # avoid treating this as being deconstructible itself - see #22951
100 return obj
101 elif hasattr(obj, "deconstruct"):
102 deconstructed = obj.deconstruct()
103 if isinstance(obj, Field):
104 # we have a field which also returns a name
105 deconstructed = deconstructed[1:]
106 path, args, kwargs = deconstructed
107 return (
108 path,
109 [self.deep_deconstruct(value) for value in args],
110 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
111 )
112 else:
113 return obj
114
115 def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
116 """
117 Return a definition of the fields that ignores field names and
118 what related fields actually relate to. Used for detecting renames (as
119 the related fields change during renames).
120 """
121 fields_def = []
122 for name, field in sorted(fields.items()):
123 deconstruction = self.deep_deconstruct(field)
124 if isinstance(field, RelatedField) and field.remote_field.model:
125 deconstruction[2].pop("to", None)
126 fields_def.append(deconstruction)
127 return fields_def
128
129 def _detect_changes(
130 self,
131 convert_packages: set[str] | None = None,
132 graph: MigrationGraph | None = None,
133 ) -> dict[str, list[Migration]]:
134 """
135 Return a dict of migration plans which will achieve the
136 change from from_state to to_state. The dict has app labels
137 as keys and a list of migrations as values.
138
139 The resulting migrations aren't specially named, but the names
140 do matter for dependencies inside the set.
141
142 convert_packages is the list of packages to convert to use migrations
143 (i.e. to make initial migrations for, in the usual case)
144
145 graph is an optional argument that, if provided, can help improve
146 dependency generation and avoid potential circular dependencies.
147 """
148 # The first phase is generating all the operations for each app
149 # and gathering them into a big per-app list.
150 # Then go through that list, order it, and split into migrations to
151 # resolve dependencies caused by M2Ms and FKs.
152 self.generated_operations = {}
153 self.altered_indexes = {}
154 self.altered_constraints = {}
155 self.renamed_fields = {}
156
157 # Prepare some old/new state and model lists, ignoring unmigrated packages.
158 self.old_model_keys = set()
159 self.new_model_keys = set()
160 for (package_label, model_name), model_state in self.from_state.models.items():
161 if package_label not in self.from_state.real_packages:
162 self.old_model_keys.add((package_label, model_name))
163
164 for (package_label, model_name), model_state in self.to_state.models.items():
165 if package_label not in self.from_state.real_packages or (
166 convert_packages and package_label in convert_packages
167 ):
168 self.new_model_keys.add((package_label, model_name))
169
170 self.from_state.resolve_fields_and_relations()
171 self.to_state.resolve_fields_and_relations()
172
173 # Renames have to come first
174 self.generate_renamed_models()
175
176 # Prepare lists of fields and generate through model map
177 self._prepare_field_lists()
178 self._generate_through_model_map()
179
180 # Generate non-rename model operations
181 self.generate_deleted_models()
182 self.generate_created_models()
183 self.generate_altered_options()
184 self.generate_altered_db_table_comment()
185
186 # Create the renamed fields and store them in self.renamed_fields.
187 # They are used by create_altered_indexes(), generate_altered_fields(),
188 # generate_removed_altered_index(), and
189 # generate_altered_index().
190 self.create_renamed_fields()
191 # Create the altered indexes and store them in self.altered_indexes.
192 # This avoids the same computation in generate_removed_indexes()
193 # and generate_added_indexes().
194 self.create_altered_indexes()
195 self.create_altered_constraints()
196 # Generate index removal operations before field is removed
197 self.generate_removed_constraints()
198 self.generate_removed_indexes()
199 # Generate field renaming operations.
200 self.generate_renamed_fields()
201 self.generate_renamed_indexes()
202 # Generate field operations.
203 self.generate_removed_fields()
204 self.generate_added_fields()
205 self.generate_altered_fields()
206 self.generate_added_indexes()
207 self.generate_added_constraints()
208 self.generate_altered_db_table()
209
210 self._sort_migrations()
211 self._build_migration_list(graph)
212 self._optimize_migrations()
213
214 return self.migrations
215
216 def _prepare_field_lists(self) -> None:
217 """
218 Prepare field lists and a list of the fields that used through models
219 in the old state so dependencies can be made from the through model
220 deletion to the field that uses it.
221 """
222 self.kept_model_keys = self.old_model_keys & self.new_model_keys
223 self.through_users = {}
224 self.old_field_keys = {
225 (package_label, model_name, field_name)
226 for package_label, model_name in self.kept_model_keys
227 for field_name in self.from_state.models[
228 package_label,
229 self.renamed_models.get((package_label, model_name), model_name),
230 ].fields
231 }
232 self.new_field_keys = {
233 (package_label, model_name, field_name)
234 for package_label, model_name in self.kept_model_keys
235 for field_name in self.to_state.models[package_label, model_name].fields
236 }
237
238 def _generate_through_model_map(self) -> None:
239 """Through model map generation."""
240 for package_label, model_name in sorted(self.old_model_keys):
241 old_model_name = self.renamed_models.get(
242 (package_label, model_name), model_name
243 )
244 old_model_state = self.from_state.models[package_label, old_model_name]
245 for field_name, field in old_model_state.fields.items():
246 if hasattr(field, "remote_field") and getattr(
247 field.remote_field, "through", None
248 ):
249 through_key = resolve_relation(
250 field.remote_field.through, package_label, model_name
251 )
252 self.through_users[through_key] = (
253 package_label,
254 old_model_name,
255 field_name,
256 )
257
258 @staticmethod
259 def _resolve_dependency(
260 dependency: tuple[str, str, str | None, bool | str],
261 ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
262 """
263 Return the resolved dependency and a boolean denoting whether or not
264 it was a settings dependency.
265 """
266 if not isinstance(dependency, SettingsTuple):
267 return dependency, False
268 resolved_package_label, resolved_object_name = getattr(
269 settings, dependency[1]
270 ).split(".")
271 return (resolved_package_label, resolved_object_name.lower()) + dependency[
272 2:
273 ], True
274
275 def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
276 """
277 Chop the lists of operations up into migrations with dependencies on
278 each other. Do this by going through an app's list of operations until
279 one is found that has an outgoing dependency that isn't in another
280 app's migration yet (hasn't been chopped off its list). Then chop off
281 the operations before it into a migration and move onto the next app.
282 If the loops completes without doing anything, there's a circular
283 dependency (which _should_ be impossible as the operations are
284 all split at this point so they can't depend and be depended on).
285 """
286 self.migrations = {}
287 num_ops = sum(len(x) for x in self.generated_operations.values())
288 chop_mode = False
289 while num_ops:
290 # On every iteration, we step through all the packages and see if there
291 # is a completed set of operations.
292 # If we find that a subset of the operations are complete we can
293 # try to chop it off from the rest and continue, but we only
294 # do this if we've already been through the list once before
295 # without any chopping and nothing has changed.
296 for package_label in sorted(self.generated_operations):
297 chopped = []
298 dependencies = set()
299 for operation in list(self.generated_operations[package_label]):
300 deps_satisfied = True
301 operation_dependencies = set()
302 for dep in operation._auto_deps:
303 # Temporarily resolve the settings dependency to
304 # prevent circular references. While keeping the
305 # dependency checks on the resolved model, add the
306 # settings dependencies.
307 original_dep = dep
308 dep, is_settings_dep = self._resolve_dependency(dep)
309 if dep[0] != package_label:
310 # External app dependency. See if it's not yet
311 # satisfied.
312 for other_operation in self.generated_operations.get(
313 dep[0], []
314 ):
315 if self.check_dependency(other_operation, dep):
316 deps_satisfied = False
317 break
318 if not deps_satisfied:
319 break
320 else:
321 if is_settings_dep:
322 operation_dependencies.add(
323 (original_dep[0], original_dep[1])
324 )
325 elif dep[0] in self.migrations:
326 operation_dependencies.add(
327 (dep[0], self.migrations[dep[0]][-1].name)
328 )
329 else:
330 # If we can't find the other app, we add a
331 # first/last dependency, but only if we've
332 # already been through once and checked
333 # everything.
334 if chop_mode:
335 # If the app already exists, we add a
336 # dependency on the last migration, as
337 # we don't know which migration
338 # contains the target field. If it's
339 # not yet migrated or has no
340 # migrations, we use __first__.
341 if graph and graph.leaf_nodes(dep[0]):
342 operation_dependencies.add(
343 graph.leaf_nodes(dep[0])[0]
344 )
345 else:
346 operation_dependencies.add(
347 (dep[0], "__first__")
348 )
349 else:
350 deps_satisfied = False
351 if deps_satisfied:
352 chopped.append(operation)
353 dependencies.update(operation_dependencies)
354 del self.generated_operations[package_label][0]
355 else:
356 break
357 # Make a migration! Well, only if there's stuff to put in it
358 if dependencies or chopped:
359 if not self.generated_operations[package_label] or chop_mode:
360 subclass = type(
361 "Migration",
362 (Migration,),
363 {"operations": [], "dependencies": []},
364 )
365 instance = subclass(
366 "auto_%i" # noqa: UP031
367 % (len(self.migrations.get(package_label, [])) + 1),
368 package_label,
369 )
370 instance.dependencies = list(dependencies)
371 instance.operations = chopped
372 instance.initial = package_label not in self.existing_packages
373 self.migrations.setdefault(package_label, []).append(instance)
374 chop_mode = False
375 else:
376 self.generated_operations[package_label] = (
377 chopped + self.generated_operations[package_label]
378 )
379 new_num_ops = sum(len(x) for x in self.generated_operations.values())
380 if new_num_ops == num_ops:
381 if not chop_mode:
382 chop_mode = True
383 else:
384 raise ValueError(
385 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
386 )
387 num_ops = new_num_ops
388
389 def _sort_migrations(self) -> None:
390 """
391 Reorder to make things possible. Reordering may be needed so FKs work
392 nicely inside the same app.
393 """
394 for package_label, ops in sorted(self.generated_operations.items()):
395 ts = TopologicalSorter()
396 for op in ops:
397 ts.add(op)
398 for dep in op._auto_deps:
399 # Resolve intra-app dependencies to handle circular
400 # references involving a settings model.
401 dep = self._resolve_dependency(dep)[0]
402 if dep[0] != package_label:
403 continue
404 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
405 self.generated_operations[package_label] = list(ts.static_order())
406
407 def _optimize_migrations(self) -> None:
408 # Add in internal dependencies among the migrations
409 for package_label, migrations in self.migrations.items():
410 for m1, m2 in zip(migrations, migrations[1:]):
411 m2.dependencies.append((package_label, m1.name))
412
413 # De-dupe dependencies
414 for migrations in self.migrations.values():
415 for migration in migrations:
416 migration.dependencies = list(set(migration.dependencies))
417
418 # Optimize migrations
419 for package_label, migrations in self.migrations.items():
420 for migration in migrations:
421 migration.operations = MigrationOptimizer().optimize(
422 migration.operations, package_label
423 )
424
425 def check_dependency(
426 self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
427 ) -> bool:
428 """
429 Return True if the given operation depends on the given dependency,
430 False otherwise.
431 """
432 # Created model
433 if dependency[2] is None and dependency[3] is True:
434 return (
435 isinstance(operation, operations.CreateModel)
436 and operation.name_lower == dependency[1].lower()
437 )
438 # Created field
439 elif dependency[2] is not None and dependency[3] is True:
440 return (
441 isinstance(operation, operations.CreateModel)
442 and operation.name_lower == dependency[1].lower()
443 and any(dependency[2] == x for x, y in operation.fields)
444 ) or (
445 isinstance(operation, operations.AddField)
446 and operation.model_name_lower == dependency[1].lower()
447 and operation.name_lower == dependency[2].lower()
448 )
449 # Removed field
450 elif dependency[2] is not None and dependency[3] is False:
451 return (
452 isinstance(operation, operations.RemoveField)
453 and operation.model_name_lower == dependency[1].lower()
454 and operation.name_lower == dependency[2].lower()
455 )
456 # Removed model
457 elif dependency[2] is None and dependency[3] is False:
458 return (
459 isinstance(operation, operations.DeleteModel)
460 and operation.name_lower == dependency[1].lower()
461 )
462 # Field being altered
463 elif dependency[2] is not None and dependency[3] == "alter":
464 return (
465 isinstance(operation, operations.AlterField)
466 and operation.model_name_lower == dependency[1].lower()
467 and operation.name_lower == dependency[2].lower()
468 )
469 # Unknown dependency. Raise an error.
470 else:
471 raise ValueError(f"Can't handle dependency {dependency!r}")
472
473 def add_operation(
474 self,
475 package_label: str,
476 operation: Operation,
477 dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
478 beginning: bool = False,
479 ) -> None:
480 # Operation dependencies are 4-element tuples:
481 # (package_label, model_name, field_name, create/delete as True/False or "alter")
482 operation._auto_deps = dependencies or []
483 if beginning:
484 self.generated_operations.setdefault(package_label, []).insert(0, operation)
485 else:
486 self.generated_operations.setdefault(package_label, []).append(operation)
487
488 def generate_renamed_models(self) -> None:
489 """
490 Find any renamed models, generate the operations for them, and remove
491 the old entry from the model lists. Must be run before other
492 model-level generation.
493 """
494 self.renamed_models = {}
495 self.renamed_models_rel = {}
496 added_models = self.new_model_keys - self.old_model_keys
497 for package_label, model_name in sorted(added_models):
498 model_state = self.to_state.models[package_label, model_name]
499 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
500
501 removed_models = self.old_model_keys - self.new_model_keys
502 for rem_package_label, rem_model_name in removed_models:
503 if rem_package_label == package_label:
504 rem_model_state = self.from_state.models[
505 rem_package_label, rem_model_name
506 ]
507 rem_model_fields_def = self.only_relation_agnostic_fields(
508 rem_model_state.fields
509 )
510 if model_fields_def == rem_model_fields_def:
511 if self.questioner.ask_rename_model(
512 rem_model_state, model_state
513 ):
514 dependencies = []
515 fields = list(model_state.fields.values()) + [
516 field.remote_field
517 for relations in self.to_state.relations[
518 package_label, model_name
519 ].values()
520 for field in relations.values()
521 if isinstance(field, RelatedField)
522 ]
523 for field in fields:
524 if isinstance(field, RelatedField):
525 dependencies.extend(
526 self._get_dependencies_for_foreign_key(
527 package_label,
528 model_name,
529 field,
530 self.to_state,
531 )
532 )
533 self.add_operation(
534 package_label,
535 operations.RenameModel(
536 old_name=rem_model_state.name,
537 new_name=model_state.name,
538 ),
539 dependencies=dependencies,
540 )
541 self.renamed_models[package_label, model_name] = (
542 rem_model_name
543 )
544 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
545 self.renamed_models_rel[renamed_models_rel_key] = (
546 f"{model_state.package_label}.{model_state.name_lower}"
547 )
548 self.old_model_keys.remove(
549 (rem_package_label, rem_model_name)
550 )
551 self.old_model_keys.add((package_label, model_name))
552 break
553
554 def generate_created_models(self) -> None:
555 """
556 Find all new models and make create
557 operations for them as well as separate operations to create any
558 foreign key or M2M relationships (these are optimized later, if
559 possible).
560
561 Defer any model options that refer to collections of fields that might
562 be deferred.
563 """
564 added_models = self.new_model_keys - self.old_model_keys
565
566 for package_label, model_name in added_models:
567 model_state = self.to_state.models[package_label, model_name]
568 # Gather related fields
569 related_fields = {}
570 primary_key_rel = None
571 for field_name, field in model_state.fields.items():
572 if isinstance(field, RelatedField):
573 if field.remote_field.model:
574 if field.primary_key:
575 primary_key_rel = field.remote_field.model
576 else:
577 related_fields[field_name] = field
578 if isinstance(field.remote_field, ManyToManyRel):
579 related_fields[field_name] = field
580
581 # Are there indexes to defer?
582 indexes = model_state.options.pop("indexes")
583 constraints = model_state.options.pop("constraints")
584 # Depend on the deletion of any possible proxy version of us
585 dependencies = [
586 (package_label, model_name, None, False),
587 ]
588 # Depend on all bases
589 for base in model_state.bases:
590 if isinstance(base, str) and "." in base:
591 base_package_label, base_name = base.split(".", 1)
592 dependencies.append((base_package_label, base_name, None, True))
593 # Depend on the removal of base fields if the new model has
594 # a field with the same name.
595 old_base_model_state = self.from_state.models.get(
596 (base_package_label, base_name)
597 )
598 new_base_model_state = self.to_state.models.get(
599 (base_package_label, base_name)
600 )
601 if old_base_model_state and new_base_model_state:
602 removed_base_fields = (
603 set(old_base_model_state.fields)
604 .difference(
605 new_base_model_state.fields,
606 )
607 .intersection(model_state.fields)
608 )
609 for removed_base_field in removed_base_fields:
610 dependencies.append(
611 (
612 base_package_label,
613 base_name,
614 removed_base_field,
615 False,
616 )
617 )
618 # Depend on the other end of the primary key if it's a relation
619 if primary_key_rel:
620 dependencies.append(
621 resolve_relation(
622 primary_key_rel,
623 package_label,
624 model_name,
625 )
626 + (None, True)
627 )
628 # Generate creation operation
629 self.add_operation(
630 package_label,
631 operations.CreateModel(
632 name=model_state.name,
633 fields=[
634 d
635 for d in model_state.fields.items()
636 if d[0] not in related_fields
637 ],
638 options=model_state.options,
639 bases=model_state.bases,
640 ),
641 dependencies=dependencies,
642 beginning=True,
643 )
644
645 # Generate operations for each related field
646 for name, field in sorted(related_fields.items()):
647 dependencies = self._get_dependencies_for_foreign_key(
648 package_label,
649 model_name,
650 field,
651 self.to_state,
652 )
653 # Depend on our own model being created
654 dependencies.append((package_label, model_name, None, True))
655 # Make operation
656 self.add_operation(
657 package_label,
658 operations.AddField(
659 model_name=model_name,
660 name=name,
661 field=field,
662 ),
663 dependencies=list(set(dependencies)),
664 )
665
666 related_dependencies = [
667 (package_label, model_name, name, True)
668 for name in sorted(related_fields)
669 ]
670 related_dependencies.append((package_label, model_name, None, True))
671 for index in indexes:
672 self.add_operation(
673 package_label,
674 operations.AddIndex(
675 model_name=model_name,
676 index=index,
677 ),
678 dependencies=related_dependencies,
679 )
680 for constraint in constraints:
681 self.add_operation(
682 package_label,
683 operations.AddConstraint(
684 model_name=model_name,
685 constraint=constraint,
686 ),
687 dependencies=related_dependencies,
688 )
689
690 def generate_deleted_models(self) -> None:
691 """
692 Find all deleted models and make delete
693 operations for them as well as separate operations to delete any
694 foreign key or M2M relationships (these are optimized later, if
695 possible).
696
697 Also bring forward removal of any model options that refer to
698 collections of fields - the inverse of generate_created_models().
699 """
700 deleted_models = self.old_model_keys - self.new_model_keys
701
702 for package_label, model_name in sorted(deleted_models):
703 model_state = self.from_state.models[package_label, model_name]
704 # Gather related fields
705 related_fields = {}
706 for field_name, field in model_state.fields.items():
707 if isinstance(field, RelatedField):
708 if field.remote_field.model:
709 related_fields[field_name] = field
710 if isinstance(field.remote_field, ManyToManyRel):
711 related_fields[field_name] = field
712
713 # Then remove each related field
714 for name in sorted(related_fields):
715 self.add_operation(
716 package_label,
717 operations.RemoveField(
718 model_name=model_name,
719 name=name,
720 ),
721 )
722 # Finally, remove the model.
723 # This depends on both the removal/alteration of all incoming fields
724 # and the removal of all its own related fields, and if it's
725 # a through model the field that references it.
726 dependencies = []
727 relations = self.from_state.relations
728 for (
729 related_object_package_label,
730 object_name,
731 ), relation_related_fields in relations[package_label, model_name].items():
732 for field_name, field in relation_related_fields.items():
733 dependencies.append(
734 (related_object_package_label, object_name, field_name, False),
735 )
736 if not isinstance(field, ManyToManyField):
737 dependencies.append(
738 (
739 related_object_package_label,
740 object_name,
741 field_name,
742 "alter",
743 ),
744 )
745
746 for name in sorted(related_fields):
747 dependencies.append((package_label, model_name, name, False))
748 # We're referenced in another field's through=
749 through_user = self.through_users.get(
750 (package_label, model_state.name_lower)
751 )
752 if through_user:
753 dependencies.append(
754 (through_user[0], through_user[1], through_user[2], False)
755 )
756 # Finally, make the operation, deduping any dependencies
757 self.add_operation(
758 package_label,
759 operations.DeleteModel(
760 name=model_state.name,
761 ),
762 dependencies=list(set(dependencies)),
763 )
764
765 def create_renamed_fields(self) -> None:
766 """Work out renamed fields."""
767 self.renamed_operations = []
768 old_field_keys = self.old_field_keys.copy()
769 for package_label, model_name, field_name in sorted(
770 self.new_field_keys - old_field_keys
771 ):
772 old_model_name = self.renamed_models.get(
773 (package_label, model_name), model_name
774 )
775 old_model_state = self.from_state.models[package_label, old_model_name]
776 new_model_state = self.to_state.models[package_label, model_name]
777 field = new_model_state.get_field(field_name)
778 # Scan to see if this is actually a rename!
779 field_dec = self.deep_deconstruct(field)
780 for rem_package_label, rem_model_name, rem_field_name in sorted(
781 old_field_keys - self.new_field_keys
782 ):
783 if rem_package_label == package_label and rem_model_name == model_name:
784 old_field = old_model_state.get_field(rem_field_name)
785 old_field_dec = self.deep_deconstruct(old_field)
786 if (
787 isinstance(field, RelatedField)
788 and field.remote_field
789 and field.remote_field.model
790 and "to" in old_field_dec[2]
791 ):
792 old_rel_to = old_field_dec[2]["to"]
793 if old_rel_to in self.renamed_models_rel:
794 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
795 old_field.set_attributes_from_name(rem_field_name)
796 old_db_column = old_field.get_attname_column()[1]
797 if old_field_dec == field_dec or (
798 # Was the field renamed and db_column equal to the
799 # old field's column added?
800 old_field_dec[0:2] == field_dec[0:2]
801 and dict(old_field_dec[2], db_column=old_db_column)
802 == field_dec[2]
803 ):
804 if self.questioner.ask_rename(
805 model_name, rem_field_name, field_name, field
806 ):
807 self.renamed_operations.append(
808 (
809 rem_package_label,
810 rem_model_name,
811 old_field.db_column,
812 rem_field_name,
813 package_label,
814 model_name,
815 field,
816 field_name,
817 )
818 )
819 old_field_keys.remove(
820 (rem_package_label, rem_model_name, rem_field_name)
821 )
822 old_field_keys.add((package_label, model_name, field_name))
823 self.renamed_fields[
824 package_label, model_name, field_name
825 ] = rem_field_name
826 break
827
828 def generate_renamed_fields(self) -> None:
829 """Generate RenameField operations."""
830 for (
831 rem_package_label,
832 rem_model_name,
833 rem_db_column,
834 rem_field_name,
835 package_label,
836 model_name,
837 field,
838 field_name,
839 ) in self.renamed_operations:
840 # A db_column mismatch requires a prior noop AlterField for the
841 # subsequent RenameField to be a noop on attempts at preserving the
842 # old name.
843 if rem_db_column != field.db_column:
844 altered_field = field.clone()
845 altered_field.name = rem_field_name
846 self.add_operation(
847 package_label,
848 operations.AlterField(
849 model_name=model_name,
850 name=rem_field_name,
851 field=altered_field,
852 ),
853 )
854 self.add_operation(
855 package_label,
856 operations.RenameField(
857 model_name=model_name,
858 old_name=rem_field_name,
859 new_name=field_name,
860 ),
861 )
862 self.old_field_keys.remove(
863 (rem_package_label, rem_model_name, rem_field_name)
864 )
865 self.old_field_keys.add((package_label, model_name, field_name))
866
867 def generate_added_fields(self) -> None:
868 """Make AddField operations."""
869 for package_label, model_name, field_name in sorted(
870 self.new_field_keys - self.old_field_keys
871 ):
872 self._generate_added_field(package_label, model_name, field_name)
873
874 def _generate_added_field(
875 self, package_label: str, model_name: str, field_name: str
876 ) -> None:
877 field = self.to_state.models[package_label, model_name].get_field(field_name)
878 # Adding a field always depends at least on its removal.
879 dependencies = [(package_label, model_name, field_name, False)]
880 # Fields that are foreignkeys/m2ms depend on stuff.
881 if isinstance(field, RelatedField) and field.remote_field.model:
882 dependencies.extend(
883 self._get_dependencies_for_foreign_key(
884 package_label,
885 model_name,
886 field,
887 self.to_state,
888 )
889 )
890 # You can't just add NOT NULL fields with no default or fields
891 # which don't allow empty strings as default.
892 time_fields = (DateField, DateTimeField, TimeField)
893 preserve_default = (
894 field.allow_null
895 or field.has_default()
896 or isinstance(field, ManyToManyField)
897 or (not field.required and field.empty_strings_allowed)
898 or (isinstance(field, time_fields) and field.auto_now)
899 )
900 if not preserve_default:
901 field = field.clone()
902 if isinstance(field, time_fields) and field.auto_now_add:
903 field.default = self.questioner.ask_auto_now_add_addition(
904 field_name, model_name
905 )
906 else:
907 field.default = self.questioner.ask_not_null_addition(
908 field_name, model_name
909 )
910 if (
911 field.primary_key
912 and field.default is not NOT_PROVIDED
913 and callable(field.default)
914 ):
915 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
916 self.add_operation(
917 package_label,
918 operations.AddField(
919 model_name=model_name,
920 name=field_name,
921 field=field,
922 preserve_default=preserve_default,
923 ),
924 dependencies=dependencies,
925 )
926
927 def generate_removed_fields(self) -> None:
928 """Make RemoveField operations."""
929 for package_label, model_name, field_name in sorted(
930 self.old_field_keys - self.new_field_keys
931 ):
932 self._generate_removed_field(package_label, model_name, field_name)
933
934 def _generate_removed_field(
935 self, package_label: str, model_name: str, field_name: str
936 ) -> None:
937 self.add_operation(
938 package_label,
939 operations.RemoveField(
940 model_name=model_name,
941 name=field_name,
942 ),
943 )
944
945 def generate_altered_fields(self) -> None:
946 """
947 Make AlterField operations, or possibly RemovedField/AddField if alter
948 isn't possible.
949 """
950 for package_label, model_name, field_name in sorted(
951 self.old_field_keys & self.new_field_keys
952 ):
953 # Did the field change?
954 old_model_name = self.renamed_models.get(
955 (package_label, model_name), model_name
956 )
957 old_field_name = self.renamed_fields.get(
958 (package_label, model_name, field_name), field_name
959 )
960 old_field = self.from_state.models[package_label, old_model_name].get_field(
961 old_field_name
962 )
963 new_field = self.to_state.models[package_label, model_name].get_field(
964 field_name
965 )
966 dependencies = []
967 # Implement any model renames on relations; these are handled by RenameModel
968 # so we need to exclude them from the comparison
969 if hasattr(new_field, "remote_field") and getattr(
970 new_field.remote_field, "model", None
971 ):
972 rename_key = resolve_relation(
973 new_field.remote_field.model, package_label, model_name
974 )
975 if rename_key in self.renamed_models:
976 new_field.remote_field.model = old_field.remote_field.model
977 # Handle ForeignKeyField which can only have a single to_field.
978 remote_field_name = getattr(new_field.remote_field, "field_name", None)
979 if remote_field_name:
980 to_field_rename_key = rename_key + (remote_field_name,)
981 if to_field_rename_key in self.renamed_fields:
982 # Repoint model name only
983 new_field.remote_field.model = old_field.remote_field.model
984 dependencies.extend(
985 self._get_dependencies_for_foreign_key(
986 package_label,
987 model_name,
988 new_field,
989 self.to_state,
990 )
991 )
992 if hasattr(new_field, "remote_field") and getattr(
993 new_field.remote_field, "through", None
994 ):
995 rename_key = resolve_relation(
996 new_field.remote_field.through, package_label, model_name
997 )
998 if rename_key in self.renamed_models:
999 new_field.remote_field.through = old_field.remote_field.through
1000 old_field_dec = self.deep_deconstruct(old_field)
1001 new_field_dec = self.deep_deconstruct(new_field)
1002 # If the field was confirmed to be renamed it means that only
1003 # db_column was allowed to change which generate_renamed_fields()
1004 # already accounts for by adding an AlterField operation.
1005 if old_field_dec != new_field_dec and old_field_name == field_name:
1006 both_m2m = isinstance(old_field, ManyToManyField) and isinstance(
1007 new_field, ManyToManyField
1008 )
1009 neither_m2m = not isinstance(
1010 old_field, ManyToManyField
1011 ) and not isinstance(new_field, ManyToManyField)
1012 if both_m2m or neither_m2m:
1013 # Either both fields are m2m or neither is
1014 preserve_default = True
1015 if (
1016 old_field.allow_null
1017 and not new_field.allow_null
1018 and not new_field.has_default()
1019 and not isinstance(new_field, ManyToManyField)
1020 ):
1021 field = new_field.clone()
1022 new_default = self.questioner.ask_not_null_alteration(
1023 field_name, model_name
1024 )
1025 if new_default is not NOT_PROVIDED:
1026 field.default = new_default
1027 preserve_default = False
1028 else:
1029 field = new_field
1030 self.add_operation(
1031 package_label,
1032 operations.AlterField(
1033 model_name=model_name,
1034 name=field_name,
1035 field=field,
1036 preserve_default=preserve_default,
1037 ),
1038 dependencies=dependencies,
1039 )
1040 else:
1041 # We cannot alter between m2m and concrete fields
1042 self._generate_removed_field(package_label, model_name, field_name)
1043 self._generate_added_field(package_label, model_name, field_name)
1044
1045 def create_altered_indexes(self) -> None:
1046 option_name = operations.AddIndex.option_name
1047
1048 for package_label, model_name in sorted(self.kept_model_keys):
1049 old_model_name = self.renamed_models.get(
1050 (package_label, model_name), model_name
1051 )
1052 old_model_state = self.from_state.models[package_label, old_model_name]
1053 new_model_state = self.to_state.models[package_label, model_name]
1054
1055 old_indexes = old_model_state.options[option_name]
1056 new_indexes = new_model_state.options[option_name]
1057 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1058 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1059 renamed_indexes = []
1060 # Find renamed indexes.
1061 remove_from_added = []
1062 remove_from_removed = []
1063 for new_index in added_indexes:
1064 new_index_dec = new_index.deconstruct()
1065 new_index_name = new_index_dec[2].pop("name")
1066 for old_index in removed_indexes:
1067 old_index_dec = old_index.deconstruct()
1068 old_index_name = old_index_dec[2].pop("name")
1069 # Indexes are the same except for the names.
1070 if (
1071 new_index_dec == old_index_dec
1072 and new_index_name != old_index_name
1073 ):
1074 renamed_indexes.append((old_index_name, new_index_name, None))
1075 remove_from_added.append(new_index)
1076 remove_from_removed.append(old_index)
1077
1078 # Remove renamed indexes from the lists of added and removed
1079 # indexes.
1080 added_indexes = [
1081 idx for idx in added_indexes if idx not in remove_from_added
1082 ]
1083 removed_indexes = [
1084 idx for idx in removed_indexes if idx not in remove_from_removed
1085 ]
1086
1087 self.altered_indexes.update(
1088 {
1089 (package_label, model_name): {
1090 "added_indexes": added_indexes,
1091 "removed_indexes": removed_indexes,
1092 "renamed_indexes": renamed_indexes,
1093 }
1094 }
1095 )
1096
1097 def generate_added_indexes(self) -> None:
1098 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1099 dependencies = self._get_dependencies_for_model(package_label, model_name)
1100 for index in alt_indexes["added_indexes"]:
1101 self.add_operation(
1102 package_label,
1103 operations.AddIndex(
1104 model_name=model_name,
1105 index=index,
1106 ),
1107 dependencies=dependencies,
1108 )
1109
1110 def generate_removed_indexes(self) -> None:
1111 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1112 for index in alt_indexes["removed_indexes"]:
1113 self.add_operation(
1114 package_label,
1115 operations.RemoveIndex(
1116 model_name=model_name,
1117 name=index.name,
1118 ),
1119 )
1120
1121 def generate_renamed_indexes(self) -> None:
1122 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1123 for old_index_name, new_index_name, old_fields in alt_indexes[
1124 "renamed_indexes"
1125 ]:
1126 self.add_operation(
1127 package_label,
1128 operations.RenameIndex(
1129 model_name=model_name,
1130 new_name=new_index_name,
1131 old_name=old_index_name,
1132 old_fields=old_fields,
1133 ),
1134 )
1135
1136 def create_altered_constraints(self) -> None:
1137 option_name = operations.AddConstraint.option_name
1138 for package_label, model_name in sorted(self.kept_model_keys):
1139 old_model_name = self.renamed_models.get(
1140 (package_label, model_name), model_name
1141 )
1142 old_model_state = self.from_state.models[package_label, old_model_name]
1143 new_model_state = self.to_state.models[package_label, model_name]
1144
1145 old_constraints = old_model_state.options[option_name]
1146 new_constraints = new_model_state.options[option_name]
1147 add_constraints = [c for c in new_constraints if c not in old_constraints]
1148 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1149
1150 self.altered_constraints.update(
1151 {
1152 (package_label, model_name): {
1153 "added_constraints": add_constraints,
1154 "removed_constraints": rem_constraints,
1155 }
1156 }
1157 )
1158
1159 def generate_added_constraints(self) -> None:
1160 for (
1161 package_label,
1162 model_name,
1163 ), alt_constraints in self.altered_constraints.items():
1164 dependencies = self._get_dependencies_for_model(package_label, model_name)
1165 for constraint in alt_constraints["added_constraints"]:
1166 self.add_operation(
1167 package_label,
1168 operations.AddConstraint(
1169 model_name=model_name,
1170 constraint=constraint,
1171 ),
1172 dependencies=dependencies,
1173 )
1174
1175 def generate_removed_constraints(self) -> None:
1176 for (
1177 package_label,
1178 model_name,
1179 ), alt_constraints in self.altered_constraints.items():
1180 for constraint in alt_constraints["removed_constraints"]:
1181 self.add_operation(
1182 package_label,
1183 operations.RemoveConstraint(
1184 model_name=model_name,
1185 name=constraint.name,
1186 ),
1187 )
1188
1189 @staticmethod
1190 def _get_dependencies_for_foreign_key(
1191 package_label: str, model_name: str, field: Field, project_state: ProjectState
1192 ) -> list[tuple[str, str, str | None, bool | str]]:
1193 remote_field_model = None
1194 if isinstance(field, RelatedField):
1195 remote_field_model = field.remote_field.model
1196 else:
1197 relations = project_state.relations[package_label, model_name]
1198 for (remote_package_label, remote_model_name), fields in relations.items():
1199 if any(
1200 field == related_field.remote_field
1201 for related_field in fields.values()
1202 if isinstance(related_field, RelatedField)
1203 ):
1204 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1205 break
1206 dep_package_label, dep_object_name = resolve_relation(
1207 remote_field_model,
1208 package_label,
1209 model_name,
1210 )
1211 dependencies = [(dep_package_label, dep_object_name, None, True)]
1212 if isinstance(field, RelatedField) and isinstance(
1213 field.remote_field, ManyToManyRel
1214 ):
1215 through_package_label, through_object_name = resolve_relation(
1216 field.remote_field.through,
1217 package_label,
1218 model_name,
1219 )
1220 dependencies.append(
1221 (through_package_label, through_object_name, None, True)
1222 )
1223 return dependencies
1224
1225 def _get_dependencies_for_model(
1226 self, package_label: str, model_name: str
1227 ) -> list[tuple[str, str, str | None, bool | str]]:
1228 """Return foreign key dependencies of the given model."""
1229 dependencies = []
1230 model_state = self.to_state.models[package_label, model_name]
1231 for field in model_state.fields.values():
1232 if isinstance(field, RelatedField):
1233 dependencies.extend(
1234 self._get_dependencies_for_foreign_key(
1235 package_label,
1236 model_name,
1237 field,
1238 self.to_state,
1239 )
1240 )
1241 return dependencies
1242
1243 def generate_altered_db_table(self) -> None:
1244 for package_label, model_name in sorted(self.kept_model_keys):
1245 old_model_name = self.renamed_models.get(
1246 (package_label, model_name), model_name
1247 )
1248 old_model_state = self.from_state.models[package_label, old_model_name]
1249 new_model_state = self.to_state.models[package_label, model_name]
1250 old_db_table_name = old_model_state.options.get("db_table")
1251 new_db_table_name = new_model_state.options.get("db_table")
1252 if old_db_table_name != new_db_table_name:
1253 self.add_operation(
1254 package_label,
1255 operations.AlterModelTable(
1256 name=model_name,
1257 table=new_db_table_name,
1258 ),
1259 )
1260
1261 def generate_altered_db_table_comment(self) -> None:
1262 for package_label, model_name in sorted(self.kept_model_keys):
1263 old_model_name = self.renamed_models.get(
1264 (package_label, model_name), model_name
1265 )
1266 old_model_state = self.from_state.models[package_label, old_model_name]
1267 new_model_state = self.to_state.models[package_label, model_name]
1268
1269 old_db_table_comment = old_model_state.options.get("db_table_comment")
1270 new_db_table_comment = new_model_state.options.get("db_table_comment")
1271 if old_db_table_comment != new_db_table_comment:
1272 self.add_operation(
1273 package_label,
1274 operations.AlterModelTableComment(
1275 name=model_name,
1276 table_comment=new_db_table_comment,
1277 ),
1278 )
1279
1280 def generate_altered_options(self) -> None:
1281 """
1282 Work out if any non-schema-affecting options have changed and make an
1283 operation to represent them in state changes (in case Python code in
1284 migrations needs them).
1285 """
1286 for package_label, model_name in sorted(self.kept_model_keys):
1287 old_model_name = self.renamed_models.get(
1288 (package_label, model_name), model_name
1289 )
1290 old_model_state = self.from_state.models[package_label, old_model_name]
1291 new_model_state = self.to_state.models[package_label, model_name]
1292 old_options = {
1293 key: value
1294 for key, value in old_model_state.options.items()
1295 if key in AlterModelOptions.ALTER_OPTION_KEYS
1296 }
1297 new_options = {
1298 key: value
1299 for key, value in new_model_state.options.items()
1300 if key in AlterModelOptions.ALTER_OPTION_KEYS
1301 }
1302 if old_options != new_options:
1303 self.add_operation(
1304 package_label,
1305 operations.AlterModelOptions(
1306 name=model_name,
1307 options=new_options,
1308 ),
1309 )
1310
1311 def arrange_for_graph(
1312 self,
1313 changes: dict[str, list[Migration]],
1314 graph: MigrationGraph,
1315 migration_name: str | None = None,
1316 ) -> dict[str, list[Migration]]:
1317 """
1318 Take a result from changes() and a MigrationGraph, and fix the names
1319 and dependencies of the changes so they extend the graph from the leaf
1320 nodes for each app.
1321 """
1322 leaves = graph.leaf_nodes()
1323 name_map = {}
1324 for package_label, migrations in list(changes.items()):
1325 if not migrations:
1326 continue
1327 # Find the app label's current leaf node
1328 app_leaf = None
1329 for leaf in leaves:
1330 if leaf[0] == package_label:
1331 app_leaf = leaf
1332 break
1333 # Do they want an initial migration for this app?
1334 if app_leaf is None and not self.questioner.ask_initial(package_label):
1335 # They don't.
1336 for migration in migrations:
1337 name_map[(package_label, migration.name)] = (
1338 package_label,
1339 "__first__",
1340 )
1341 del changes[package_label]
1342 continue
1343 # Work out the next number in the sequence
1344 if app_leaf is None:
1345 next_number = 1
1346 else:
1347 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1348 # Name each migration
1349 for i, migration in enumerate(migrations):
1350 if i == 0 and app_leaf:
1351 migration.dependencies.append(app_leaf)
1352 new_name_parts = ["%04i" % next_number] # noqa: UP031
1353 if migration_name:
1354 new_name_parts.append(migration_name)
1355 elif i == 0 and not app_leaf:
1356 new_name_parts.append("initial")
1357 else:
1358 new_name_parts.append(migration.suggest_name()[:100])
1359 new_name = "_".join(new_name_parts)
1360 name_map[(package_label, migration.name)] = (package_label, new_name)
1361 next_number += 1
1362 migration.name = new_name
1363 # Now fix dependencies
1364 for migrations in changes.values():
1365 for migration in migrations:
1366 migration.dependencies = [
1367 name_map.get(d, d) for d in migration.dependencies
1368 ]
1369 return changes
1370
1371 def _trim_to_packages(
1372 self, changes: dict[str, list[Migration]], package_labels: set[str]
1373 ) -> dict[str, list[Migration]]:
1374 """
1375 Take changes from arrange_for_graph() and set of app labels, and return
1376 a modified set of changes which trims out as many migrations that are
1377 not in package_labels as possible. Note that some other migrations may
1378 still be present as they may be required dependencies.
1379 """
1380 # Gather other app dependencies in a first pass
1381 app_dependencies = {}
1382 for package_label, migrations in changes.items():
1383 for migration in migrations:
1384 for dep_package_label, name in migration.dependencies:
1385 app_dependencies.setdefault(package_label, set()).add(
1386 dep_package_label
1387 )
1388 required_packages = set(package_labels)
1389 # Keep resolving till there's no change
1390 old_required_packages = None
1391 while old_required_packages != required_packages:
1392 old_required_packages = set(required_packages)
1393 required_packages.update(
1394 *[
1395 app_dependencies.get(package_label, ())
1396 for package_label in required_packages
1397 ]
1398 )
1399 # Remove all migrations that aren't needed
1400 for package_label in list(changes):
1401 if package_label not in required_packages:
1402 del changes[package_label]
1403 return changes
1404
1405 @classmethod
1406 def parse_number(cls, name: str) -> int | None:
1407 """
1408 Given a migration name, try to extract a number from the beginning of
1409 it. For a squashed migration such as '0001_squashed_0004…', return the
1410 second number. If no number is found, return None.
1411 """
1412 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1413 return int(squashed_match[1])
1414 match = re.match(r"^\d+", name)
1415 if match:
1416 return int(match[0])
1417 return None