1from __future__ import annotations
2
3import functools
4import re
5from graphlib import TopologicalSorter
6from typing import TYPE_CHECKING, Any
7
8from plain.models.fields import (
9 NOT_PROVIDED,
10 DateField,
11 DateTimeField,
12 Field,
13 TimeField,
14)
15from plain.models.fields.related import ManyToManyField, RelatedField
16from plain.models.fields.reverse_related import ManyToManyRel
17from plain.models.migrations import operations
18from plain.models.migrations.migration import Migration, SettingsTuple
19from plain.models.migrations.operations.models import AlterModelOptions
20from plain.models.migrations.optimizer import MigrationOptimizer
21from plain.models.migrations.questioner import MigrationQuestioner
22from plain.models.migrations.utils import (
23 COMPILED_REGEX_TYPE,
24 RegexObject,
25 resolve_relation,
26)
27from plain.runtime import settings
28
29if TYPE_CHECKING:
30 from plain.models.migrations.graph import MigrationGraph
31 from plain.models.migrations.operations.base import Operation
32 from plain.models.migrations.state import ProjectState
33
34
35class MigrationAutodetector:
36 """
37 Take a pair of ProjectStates and compare them to see what the first would
38 need doing to make it match the second (the second usually being the
39 project's current state).
40
41 Note that this naturally operates on entire projects at a time,
42 as it's likely that changes interact (for example, you can't
43 add a ForeignKeyField without having a migration to add the table it
44 depends on first). A user interface may offer single-app usage
45 if it wishes, with the caveat that it may not always be possible.
46 """
47
48 def __init__(
49 self,
50 from_state: ProjectState,
51 to_state: ProjectState,
52 questioner: MigrationQuestioner | None = None,
53 ):
54 self.from_state = from_state
55 self.to_state = to_state
56 self.questioner = questioner or MigrationQuestioner()
57 self.existing_packages = {app for app, model in from_state.models}
58
59 def changes(
60 self,
61 graph: MigrationGraph,
62 trim_to_packages: set[str] | None = None,
63 convert_packages: set[str] | None = None,
64 migration_name: str | None = None,
65 ) -> dict[str, list[Migration]]:
66 """
67 Main entry point to produce a list of applicable changes.
68 Take a graph to base names on and an optional set of packages
69 to try and restrict to (restriction is not guaranteed)
70 """
71 changes = self._detect_changes(convert_packages, graph)
72 changes = self.arrange_for_graph(changes, graph, migration_name)
73 if trim_to_packages:
74 changes = self._trim_to_packages(changes, trim_to_packages)
75 return changes
76
77 def deep_deconstruct(self, obj: Any) -> Any:
78 """
79 Recursive deconstruction for a field and its arguments.
80 Used for full comparison for rename/alter; sometimes a single-level
81 deconstruction will not compare correctly.
82 """
83 if isinstance(obj, list):
84 return [self.deep_deconstruct(value) for value in obj]
85 elif isinstance(obj, tuple):
86 return tuple(self.deep_deconstruct(value) for value in obj)
87 elif isinstance(obj, dict):
88 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
89 elif isinstance(obj, functools.partial):
90 return (
91 obj.func,
92 self.deep_deconstruct(obj.args),
93 self.deep_deconstruct(obj.keywords),
94 )
95 elif isinstance(obj, COMPILED_REGEX_TYPE):
96 return RegexObject(obj)
97 elif isinstance(obj, type):
98 # If this is a type that implements 'deconstruct' as an instance method,
99 # avoid treating this as being deconstructible itself - see #22951
100 return obj
101 elif hasattr(obj, "deconstruct"):
102 deconstructed = obj.deconstruct()
103 if isinstance(obj, Field):
104 # we have a field which also returns a name
105 deconstructed = deconstructed[1:]
106 path, args, kwargs = deconstructed
107 return (
108 path,
109 [self.deep_deconstruct(value) for value in args],
110 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
111 )
112 else:
113 return obj
114
115 def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
116 """
117 Return a definition of the fields that ignores field names and
118 what related fields actually relate to. Used for detecting renames (as
119 the related fields change during renames).
120 """
121 fields_def = []
122 for name, field in sorted(fields.items()):
123 deconstruction = self.deep_deconstruct(field)
124 if isinstance(field, RelatedField) and field.remote_field.model:
125 deconstruction[2].pop("to", None)
126 fields_def.append(deconstruction)
127 return fields_def
128
129 def _detect_changes(
130 self,
131 convert_packages: set[str] | None = None,
132 graph: MigrationGraph | None = None,
133 ) -> dict[str, list[Migration]]:
134 """
135 Return a dict of migration plans which will achieve the
136 change from from_state to to_state. The dict has app labels
137 as keys and a list of migrations as values.
138
139 The resulting migrations aren't specially named, but the names
140 do matter for dependencies inside the set.
141
142 convert_packages is the list of packages to convert to use migrations
143 (i.e. to make initial migrations for, in the usual case)
144
145 graph is an optional argument that, if provided, can help improve
146 dependency generation and avoid potential circular dependencies.
147 """
148 # The first phase is generating all the operations for each app
149 # and gathering them into a big per-app list.
150 # Then go through that list, order it, and split into migrations to
151 # resolve dependencies caused by M2Ms and FKs.
152 self.generated_operations = {}
153 self.altered_indexes = {}
154 self.altered_constraints = {}
155 self.renamed_fields = {}
156
157 # Prepare some old/new state and model lists, ignoring unmigrated packages.
158 self.old_model_keys = set()
159 self.new_model_keys = set()
160 for (package_label, model_name), model_state in self.from_state.models.items():
161 if package_label not in self.from_state.real_packages:
162 self.old_model_keys.add((package_label, model_name))
163
164 for (package_label, model_name), model_state in self.to_state.models.items():
165 if package_label not in self.from_state.real_packages or (
166 convert_packages and package_label in convert_packages
167 ):
168 self.new_model_keys.add((package_label, model_name))
169
170 self.from_state.resolve_fields_and_relations()
171 self.to_state.resolve_fields_and_relations()
172
173 # Renames have to come first
174 self.generate_renamed_models()
175
176 # Prepare lists of fields and generate through model map
177 self._prepare_field_lists()
178 self._generate_through_model_map()
179
180 # Generate non-rename model operations
181 self.generate_deleted_models()
182 self.generate_created_models()
183 self.generate_altered_options()
184
185 # Create the renamed fields and store them in self.renamed_fields.
186 # They are used by create_altered_indexes(), generate_altered_fields(),
187 # generate_removed_altered_index(), and
188 # generate_altered_index().
189 self.create_renamed_fields()
190 # Create the altered indexes and store them in self.altered_indexes.
191 # This avoids the same computation in generate_removed_indexes()
192 # and generate_added_indexes().
193 self.create_altered_indexes()
194 self.create_altered_constraints()
195 # Generate index removal operations before field is removed
196 self.generate_removed_constraints()
197 self.generate_removed_indexes()
198 # Generate field renaming operations.
199 self.generate_renamed_fields()
200 self.generate_renamed_indexes()
201 # Generate field operations.
202 self.generate_removed_fields()
203 self.generate_added_fields()
204 self.generate_altered_fields()
205 self.generate_added_indexes()
206 self.generate_added_constraints()
207 self.generate_altered_db_table()
208
209 self._sort_migrations()
210 self._build_migration_list(graph)
211 self._optimize_migrations()
212
213 return self.migrations
214
215 def _prepare_field_lists(self) -> None:
216 """
217 Prepare field lists and a list of the fields that used through models
218 in the old state so dependencies can be made from the through model
219 deletion to the field that uses it.
220 """
221 self.kept_model_keys = self.old_model_keys & self.new_model_keys
222 self.through_users = {}
223 self.old_field_keys = {
224 (package_label, model_name, field_name)
225 for package_label, model_name in self.kept_model_keys
226 for field_name in self.from_state.models[
227 package_label,
228 self.renamed_models.get((package_label, model_name), model_name),
229 ].fields
230 }
231 self.new_field_keys = {
232 (package_label, model_name, field_name)
233 for package_label, model_name in self.kept_model_keys
234 for field_name in self.to_state.models[package_label, model_name].fields
235 }
236
237 def _generate_through_model_map(self) -> None:
238 """Through model map generation."""
239 for package_label, model_name in sorted(self.old_model_keys):
240 old_model_name = self.renamed_models.get(
241 (package_label, model_name), model_name
242 )
243 old_model_state = self.from_state.models[package_label, old_model_name]
244 for field_name, field in old_model_state.fields.items():
245 if hasattr(field, "remote_field") and getattr(
246 field.remote_field, "through", None
247 ):
248 through_key = resolve_relation(
249 field.remote_field.through, package_label, model_name
250 )
251 self.through_users[through_key] = (
252 package_label,
253 old_model_name,
254 field_name,
255 )
256
257 @staticmethod
258 def _resolve_dependency(
259 dependency: tuple[str, str, str | None, bool | str],
260 ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
261 """
262 Return the resolved dependency and a boolean denoting whether or not
263 it was a settings dependency.
264 """
265 if not isinstance(dependency, SettingsTuple):
266 return dependency, False
267 resolved_package_label, resolved_object_name = getattr(
268 settings, dependency[1]
269 ).split(".")
270 return (resolved_package_label, resolved_object_name.lower()) + dependency[
271 2:
272 ], True
273
274 def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
275 """
276 Chop the lists of operations up into migrations with dependencies on
277 each other. Do this by going through an app's list of operations until
278 one is found that has an outgoing dependency that isn't in another
279 app's migration yet (hasn't been chopped off its list). Then chop off
280 the operations before it into a migration and move onto the next app.
281 If the loops completes without doing anything, there's a circular
282 dependency (which _should_ be impossible as the operations are
283 all split at this point so they can't depend and be depended on).
284 """
285 self.migrations = {}
286 num_ops = sum(len(x) for x in self.generated_operations.values())
287 chop_mode = False
288 while num_ops:
289 # On every iteration, we step through all the packages and see if there
290 # is a completed set of operations.
291 # If we find that a subset of the operations are complete we can
292 # try to chop it off from the rest and continue, but we only
293 # do this if we've already been through the list once before
294 # without any chopping and nothing has changed.
295 for package_label in sorted(self.generated_operations):
296 chopped = []
297 dependencies = set()
298 for operation in list(self.generated_operations[package_label]):
299 deps_satisfied = True
300 operation_dependencies = set()
301 for dep in operation._auto_deps:
302 # Temporarily resolve the settings dependency to
303 # prevent circular references. While keeping the
304 # dependency checks on the resolved model, add the
305 # settings dependencies.
306 original_dep = dep
307 dep, is_settings_dep = self._resolve_dependency(dep)
308 if dep[0] != package_label:
309 # External app dependency. See if it's not yet
310 # satisfied.
311 for other_operation in self.generated_operations.get(
312 dep[0], []
313 ):
314 if self.check_dependency(other_operation, dep):
315 deps_satisfied = False
316 break
317 if not deps_satisfied:
318 break
319 else:
320 if is_settings_dep:
321 operation_dependencies.add(
322 (original_dep[0], original_dep[1])
323 )
324 elif dep[0] in self.migrations:
325 operation_dependencies.add(
326 (dep[0], self.migrations[dep[0]][-1].name)
327 )
328 else:
329 # If we can't find the other app, we add a
330 # first/last dependency, but only if we've
331 # already been through once and checked
332 # everything.
333 if chop_mode:
334 # If the app already exists, we add a
335 # dependency on the last migration, as
336 # we don't know which migration
337 # contains the target field. If it's
338 # not yet migrated or has no
339 # migrations, we use __first__.
340 if graph and graph.leaf_nodes(dep[0]):
341 operation_dependencies.add(
342 graph.leaf_nodes(dep[0])[0]
343 )
344 else:
345 operation_dependencies.add(
346 (dep[0], "__first__")
347 )
348 else:
349 deps_satisfied = False
350 if deps_satisfied:
351 chopped.append(operation)
352 dependencies.update(operation_dependencies)
353 del self.generated_operations[package_label][0]
354 else:
355 break
356 # Make a migration! Well, only if there's stuff to put in it
357 if dependencies or chopped:
358 if not self.generated_operations[package_label] or chop_mode:
359 subclass = type(
360 "Migration",
361 (Migration,),
362 {"operations": [], "dependencies": []},
363 )
364 instance = subclass(
365 "auto_%i" # noqa: UP031
366 % (len(self.migrations.get(package_label, [])) + 1),
367 package_label,
368 )
369 instance.dependencies = list(dependencies)
370 instance.operations = chopped
371 instance.initial = package_label not in self.existing_packages
372 self.migrations.setdefault(package_label, []).append(instance)
373 chop_mode = False
374 else:
375 self.generated_operations[package_label] = (
376 chopped + self.generated_operations[package_label]
377 )
378 new_num_ops = sum(len(x) for x in self.generated_operations.values())
379 if new_num_ops == num_ops:
380 if not chop_mode:
381 chop_mode = True
382 else:
383 raise ValueError(
384 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
385 )
386 num_ops = new_num_ops
387
388 def _sort_migrations(self) -> None:
389 """
390 Reorder to make things possible. Reordering may be needed so FKs work
391 nicely inside the same app.
392 """
393 for package_label, ops in sorted(self.generated_operations.items()):
394 ts = TopologicalSorter()
395 for op in ops:
396 ts.add(op)
397 for dep in op._auto_deps:
398 # Resolve intra-app dependencies to handle circular
399 # references involving a settings model.
400 dep = self._resolve_dependency(dep)[0]
401 if dep[0] != package_label:
402 continue
403 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
404 self.generated_operations[package_label] = list(ts.static_order())
405
406 def _optimize_migrations(self) -> None:
407 # Add in internal dependencies among the migrations
408 for package_label, migrations in self.migrations.items():
409 for m1, m2 in zip(migrations, migrations[1:]):
410 m2.dependencies.append((package_label, m1.name))
411
412 # De-dupe dependencies
413 for migrations in self.migrations.values():
414 for migration in migrations:
415 migration.dependencies = list(set(migration.dependencies))
416
417 # Optimize migrations
418 for package_label, migrations in self.migrations.items():
419 for migration in migrations:
420 migration.operations = MigrationOptimizer().optimize(
421 migration.operations, package_label
422 )
423
424 def check_dependency(
425 self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
426 ) -> bool:
427 """
428 Return True if the given operation depends on the given dependency,
429 False otherwise.
430 """
431 # Created model
432 if dependency[2] is None and dependency[3] is True:
433 return (
434 isinstance(operation, operations.CreateModel)
435 and operation.name_lower == dependency[1].lower()
436 )
437 # Created field
438 elif dependency[2] is not None and dependency[3] is True:
439 return (
440 isinstance(operation, operations.CreateModel)
441 and operation.name_lower == dependency[1].lower()
442 and any(dependency[2] == x for x, y in operation.fields)
443 ) or (
444 isinstance(operation, operations.AddField)
445 and operation.model_name_lower == dependency[1].lower()
446 and operation.name_lower == dependency[2].lower()
447 )
448 # Removed field
449 elif dependency[2] is not None and dependency[3] is False:
450 return (
451 isinstance(operation, operations.RemoveField)
452 and operation.model_name_lower == dependency[1].lower()
453 and operation.name_lower == dependency[2].lower()
454 )
455 # Removed model
456 elif dependency[2] is None and dependency[3] is False:
457 return (
458 isinstance(operation, operations.DeleteModel)
459 and operation.name_lower == dependency[1].lower()
460 )
461 # Field being altered
462 elif dependency[2] is not None and dependency[3] == "alter":
463 return (
464 isinstance(operation, operations.AlterField)
465 and operation.model_name_lower == dependency[1].lower()
466 and operation.name_lower == dependency[2].lower()
467 )
468 # Unknown dependency. Raise an error.
469 else:
470 raise ValueError(f"Can't handle dependency {dependency!r}")
471
472 def add_operation(
473 self,
474 package_label: str,
475 operation: Operation,
476 dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
477 beginning: bool = False,
478 ) -> None:
479 # Operation dependencies are 4-element tuples:
480 # (package_label, model_name, field_name, create/delete as True/False or "alter")
481 operation._auto_deps = dependencies or []
482 if beginning:
483 self.generated_operations.setdefault(package_label, []).insert(0, operation)
484 else:
485 self.generated_operations.setdefault(package_label, []).append(operation)
486
487 def generate_renamed_models(self) -> None:
488 """
489 Find any renamed models, generate the operations for them, and remove
490 the old entry from the model lists. Must be run before other
491 model-level generation.
492 """
493 self.renamed_models = {}
494 self.renamed_models_rel = {}
495 added_models = self.new_model_keys - self.old_model_keys
496 for package_label, model_name in sorted(added_models):
497 model_state = self.to_state.models[package_label, model_name]
498 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
499
500 removed_models = self.old_model_keys - self.new_model_keys
501 for rem_package_label, rem_model_name in removed_models:
502 if rem_package_label == package_label:
503 rem_model_state = self.from_state.models[
504 rem_package_label, rem_model_name
505 ]
506 rem_model_fields_def = self.only_relation_agnostic_fields(
507 rem_model_state.fields
508 )
509 if model_fields_def == rem_model_fields_def:
510 if self.questioner.ask_rename_model(
511 rem_model_state, model_state
512 ):
513 dependencies = []
514 fields = list(model_state.fields.values()) + [
515 field.remote_field
516 for relations in self.to_state.relations[
517 package_label, model_name
518 ].values()
519 for field in relations.values()
520 if isinstance(field, RelatedField)
521 ]
522 for field in fields:
523 if isinstance(field, RelatedField):
524 dependencies.extend(
525 self._get_dependencies_for_foreign_key(
526 package_label,
527 model_name,
528 field,
529 self.to_state,
530 )
531 )
532 self.add_operation(
533 package_label,
534 operations.RenameModel(
535 old_name=rem_model_state.name,
536 new_name=model_state.name,
537 ),
538 dependencies=dependencies,
539 )
540 self.renamed_models[package_label, model_name] = (
541 rem_model_name
542 )
543 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
544 self.renamed_models_rel[renamed_models_rel_key] = (
545 f"{model_state.package_label}.{model_state.name_lower}"
546 )
547 self.old_model_keys.remove(
548 (rem_package_label, rem_model_name)
549 )
550 self.old_model_keys.add((package_label, model_name))
551 break
552
553 def generate_created_models(self) -> None:
554 """
555 Find all new models and make create
556 operations for them as well as separate operations to create any
557 foreign key or M2M relationships (these are optimized later, if
558 possible).
559
560 Defer any model options that refer to collections of fields that might
561 be deferred.
562 """
563 added_models = self.new_model_keys - self.old_model_keys
564
565 for package_label, model_name in added_models:
566 model_state = self.to_state.models[package_label, model_name]
567 # Gather related fields
568 related_fields = {}
569 primary_key_rel = None
570 for field_name, field in model_state.fields.items():
571 if isinstance(field, RelatedField):
572 if field.remote_field.model:
573 if field.primary_key:
574 primary_key_rel = field.remote_field.model
575 else:
576 related_fields[field_name] = field
577 if isinstance(field.remote_field, ManyToManyRel):
578 related_fields[field_name] = field
579
580 # Are there indexes to defer?
581 indexes = model_state.options.pop("indexes")
582 constraints = model_state.options.pop("constraints")
583 # Depend on the deletion of any possible proxy version of us
584 dependencies = [
585 (package_label, model_name, None, False),
586 ]
587 # Depend on all bases
588 for base in model_state.bases:
589 if isinstance(base, str) and "." in base:
590 base_package_label, base_name = base.split(".", 1)
591 dependencies.append((base_package_label, base_name, None, True))
592 # Depend on the removal of base fields if the new model has
593 # a field with the same name.
594 old_base_model_state = self.from_state.models.get(
595 (base_package_label, base_name)
596 )
597 new_base_model_state = self.to_state.models.get(
598 (base_package_label, base_name)
599 )
600 if old_base_model_state and new_base_model_state:
601 removed_base_fields = (
602 set(old_base_model_state.fields)
603 .difference(
604 new_base_model_state.fields,
605 )
606 .intersection(model_state.fields)
607 )
608 for removed_base_field in removed_base_fields:
609 dependencies.append(
610 (
611 base_package_label,
612 base_name,
613 removed_base_field,
614 False,
615 )
616 )
617 # Depend on the other end of the primary key if it's a relation
618 if primary_key_rel:
619 dependencies.append(
620 resolve_relation(
621 primary_key_rel,
622 package_label,
623 model_name,
624 )
625 + (None, True)
626 )
627 # Generate creation operation
628 self.add_operation(
629 package_label,
630 operations.CreateModel(
631 name=model_state.name,
632 fields=[
633 d
634 for d in model_state.fields.items()
635 if d[0] not in related_fields
636 ],
637 options=model_state.options,
638 bases=model_state.bases,
639 ),
640 dependencies=dependencies,
641 beginning=True,
642 )
643
644 # Generate operations for each related field
645 for name, field in sorted(related_fields.items()):
646 dependencies = self._get_dependencies_for_foreign_key(
647 package_label,
648 model_name,
649 field,
650 self.to_state,
651 )
652 # Depend on our own model being created
653 dependencies.append((package_label, model_name, None, True))
654 # Make operation
655 self.add_operation(
656 package_label,
657 operations.AddField(
658 model_name=model_name,
659 name=name,
660 field=field,
661 ),
662 dependencies=list(set(dependencies)),
663 )
664
665 related_dependencies = [
666 (package_label, model_name, name, True)
667 for name in sorted(related_fields)
668 ]
669 related_dependencies.append((package_label, model_name, None, True))
670 for index in indexes:
671 self.add_operation(
672 package_label,
673 operations.AddIndex(
674 model_name=model_name,
675 index=index,
676 ),
677 dependencies=related_dependencies,
678 )
679 for constraint in constraints:
680 self.add_operation(
681 package_label,
682 operations.AddConstraint(
683 model_name=model_name,
684 constraint=constraint,
685 ),
686 dependencies=related_dependencies,
687 )
688
689 def generate_deleted_models(self) -> None:
690 """
691 Find all deleted models and make delete
692 operations for them as well as separate operations to delete any
693 foreign key or M2M relationships (these are optimized later, if
694 possible).
695
696 Also bring forward removal of any model options that refer to
697 collections of fields - the inverse of generate_created_models().
698 """
699 deleted_models = self.old_model_keys - self.new_model_keys
700
701 for package_label, model_name in sorted(deleted_models):
702 model_state = self.from_state.models[package_label, model_name]
703 # Gather related fields
704 related_fields = {}
705 for field_name, field in model_state.fields.items():
706 if isinstance(field, RelatedField):
707 if field.remote_field.model:
708 related_fields[field_name] = field
709 if isinstance(field.remote_field, ManyToManyRel):
710 related_fields[field_name] = field
711
712 # Then remove each related field
713 for name in sorted(related_fields):
714 self.add_operation(
715 package_label,
716 operations.RemoveField(
717 model_name=model_name,
718 name=name,
719 ),
720 )
721 # Finally, remove the model.
722 # This depends on both the removal/alteration of all incoming fields
723 # and the removal of all its own related fields, and if it's
724 # a through model the field that references it.
725 dependencies = []
726 relations = self.from_state.relations
727 for (
728 related_object_package_label,
729 object_name,
730 ), relation_related_fields in relations[package_label, model_name].items():
731 for field_name, field in relation_related_fields.items():
732 dependencies.append(
733 (related_object_package_label, object_name, field_name, False),
734 )
735 if not isinstance(field, ManyToManyField):
736 dependencies.append(
737 (
738 related_object_package_label,
739 object_name,
740 field_name,
741 "alter",
742 ),
743 )
744
745 for name in sorted(related_fields):
746 dependencies.append((package_label, model_name, name, False))
747 # We're referenced in another field's through=
748 through_user = self.through_users.get(
749 (package_label, model_state.name_lower)
750 )
751 if through_user:
752 dependencies.append(
753 (through_user[0], through_user[1], through_user[2], False)
754 )
755 # Finally, make the operation, deduping any dependencies
756 self.add_operation(
757 package_label,
758 operations.DeleteModel(
759 name=model_state.name,
760 ),
761 dependencies=list(set(dependencies)),
762 )
763
764 def create_renamed_fields(self) -> None:
765 """Work out renamed fields."""
766 self.renamed_operations = []
767 old_field_keys = self.old_field_keys.copy()
768 for package_label, model_name, field_name in sorted(
769 self.new_field_keys - old_field_keys
770 ):
771 old_model_name = self.renamed_models.get(
772 (package_label, model_name), model_name
773 )
774 old_model_state = self.from_state.models[package_label, old_model_name]
775 new_model_state = self.to_state.models[package_label, model_name]
776 field = new_model_state.get_field(field_name)
777 # Scan to see if this is actually a rename!
778 field_dec = self.deep_deconstruct(field)
779 for rem_package_label, rem_model_name, rem_field_name in sorted(
780 old_field_keys - self.new_field_keys
781 ):
782 if rem_package_label == package_label and rem_model_name == model_name:
783 old_field = old_model_state.get_field(rem_field_name)
784 old_field_dec = self.deep_deconstruct(old_field)
785 if (
786 isinstance(field, RelatedField)
787 and field.remote_field
788 and field.remote_field.model
789 and "to" in old_field_dec[2]
790 ):
791 old_rel_to = old_field_dec[2]["to"]
792 if old_rel_to in self.renamed_models_rel:
793 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
794 old_field.set_attributes_from_name(rem_field_name)
795 if old_field_dec == field_dec:
796 if self.questioner.ask_rename(
797 model_name, rem_field_name, field_name, field
798 ):
799 self.renamed_operations.append(
800 (
801 rem_package_label,
802 rem_model_name,
803 rem_field_name,
804 package_label,
805 model_name,
806 field,
807 field_name,
808 )
809 )
810 old_field_keys.remove(
811 (rem_package_label, rem_model_name, rem_field_name)
812 )
813 old_field_keys.add((package_label, model_name, field_name))
814 self.renamed_fields[
815 package_label, model_name, field_name
816 ] = rem_field_name
817 break
818
819 def generate_renamed_fields(self) -> None:
820 """Generate RenameField operations."""
821 for (
822 rem_package_label,
823 rem_model_name,
824 rem_field_name,
825 package_label,
826 model_name,
827 field,
828 field_name,
829 ) in self.renamed_operations:
830 self.add_operation(
831 package_label,
832 operations.RenameField(
833 model_name=model_name,
834 old_name=rem_field_name,
835 new_name=field_name,
836 ),
837 )
838 self.old_field_keys.remove(
839 (rem_package_label, rem_model_name, rem_field_name)
840 )
841 self.old_field_keys.add((package_label, model_name, field_name))
842
843 def generate_added_fields(self) -> None:
844 """Make AddField operations."""
845 for package_label, model_name, field_name in sorted(
846 self.new_field_keys - self.old_field_keys
847 ):
848 self._generate_added_field(package_label, model_name, field_name)
849
850 def _generate_added_field(
851 self, package_label: str, model_name: str, field_name: str
852 ) -> None:
853 field = self.to_state.models[package_label, model_name].get_field(field_name)
854 # Adding a field always depends at least on its removal.
855 dependencies = [(package_label, model_name, field_name, False)]
856 # Fields that are foreignkeys/m2ms depend on stuff.
857 if isinstance(field, RelatedField) and field.remote_field.model:
858 dependencies.extend(
859 self._get_dependencies_for_foreign_key(
860 package_label,
861 model_name,
862 field,
863 self.to_state,
864 )
865 )
866 # You can't just add NOT NULL fields with no default or fields
867 # which don't allow empty strings as default.
868 time_fields = (DateField, DateTimeField, TimeField)
869 preserve_default = (
870 field.allow_null
871 or field.has_default()
872 or isinstance(field, ManyToManyField)
873 or (not field.required and field.empty_strings_allowed)
874 or (isinstance(field, time_fields) and field.auto_now)
875 )
876 if not preserve_default:
877 field = field.clone()
878 if isinstance(field, time_fields) and field.auto_now_add:
879 field.default = self.questioner.ask_auto_now_add_addition(
880 field_name, model_name
881 )
882 else:
883 field.default = self.questioner.ask_not_null_addition(
884 field_name, model_name
885 )
886 if (
887 field.primary_key
888 and field.default is not NOT_PROVIDED
889 and callable(field.default)
890 ):
891 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
892 self.add_operation(
893 package_label,
894 operations.AddField(
895 model_name=model_name,
896 name=field_name,
897 field=field,
898 preserve_default=preserve_default,
899 ),
900 dependencies=dependencies,
901 )
902
903 def generate_removed_fields(self) -> None:
904 """Make RemoveField operations."""
905 for package_label, model_name, field_name in sorted(
906 self.old_field_keys - self.new_field_keys
907 ):
908 self._generate_removed_field(package_label, model_name, field_name)
909
910 def _generate_removed_field(
911 self, package_label: str, model_name: str, field_name: str
912 ) -> None:
913 self.add_operation(
914 package_label,
915 operations.RemoveField(
916 model_name=model_name,
917 name=field_name,
918 ),
919 )
920
921 def generate_altered_fields(self) -> None:
922 """
923 Make AlterField operations, or possibly RemovedField/AddField if alter
924 isn't possible.
925 """
926 for package_label, model_name, field_name in sorted(
927 self.old_field_keys & self.new_field_keys
928 ):
929 # Did the field change?
930 old_model_name = self.renamed_models.get(
931 (package_label, model_name), model_name
932 )
933 old_field_name = self.renamed_fields.get(
934 (package_label, model_name, field_name), field_name
935 )
936 old_field = self.from_state.models[package_label, old_model_name].get_field(
937 old_field_name
938 )
939 new_field = self.to_state.models[package_label, model_name].get_field(
940 field_name
941 )
942 dependencies = []
943 # Implement any model renames on relations; these are handled by RenameModel
944 # so we need to exclude them from the comparison
945 if hasattr(new_field, "remote_field") and getattr(
946 new_field.remote_field, "model", None
947 ):
948 rename_key = resolve_relation(
949 new_field.remote_field.model, package_label, model_name
950 )
951 if rename_key in self.renamed_models:
952 new_field.remote_field.model = old_field.remote_field.model
953 # Handle ForeignKeyField which can only have a single to_field.
954 remote_field_name = getattr(new_field.remote_field, "field_name", None)
955 if remote_field_name:
956 to_field_rename_key = rename_key + (remote_field_name,)
957 if to_field_rename_key in self.renamed_fields:
958 # Repoint model name only
959 new_field.remote_field.model = old_field.remote_field.model
960 dependencies.extend(
961 self._get_dependencies_for_foreign_key(
962 package_label,
963 model_name,
964 new_field,
965 self.to_state,
966 )
967 )
968 if hasattr(new_field, "remote_field") and getattr(
969 new_field.remote_field, "through", None
970 ):
971 rename_key = resolve_relation(
972 new_field.remote_field.through, package_label, model_name
973 )
974 if rename_key in self.renamed_models:
975 new_field.remote_field.through = old_field.remote_field.through
976 old_field_dec = self.deep_deconstruct(old_field)
977 new_field_dec = self.deep_deconstruct(new_field)
978 if old_field_dec != new_field_dec and old_field_name == field_name:
979 both_m2m = isinstance(old_field, ManyToManyField) and isinstance(
980 new_field, ManyToManyField
981 )
982 neither_m2m = not isinstance(
983 old_field, ManyToManyField
984 ) and not isinstance(new_field, ManyToManyField)
985 if both_m2m or neither_m2m:
986 # Either both fields are m2m or neither is
987 preserve_default = True
988 if (
989 old_field.allow_null
990 and not new_field.allow_null
991 and not new_field.has_default()
992 and not isinstance(new_field, ManyToManyField)
993 ):
994 field = new_field.clone()
995 new_default = self.questioner.ask_not_null_alteration(
996 field_name, model_name
997 )
998 if new_default is not NOT_PROVIDED:
999 field.default = new_default
1000 preserve_default = False
1001 else:
1002 field = new_field
1003 self.add_operation(
1004 package_label,
1005 operations.AlterField(
1006 model_name=model_name,
1007 name=field_name,
1008 field=field,
1009 preserve_default=preserve_default,
1010 ),
1011 dependencies=dependencies,
1012 )
1013 else:
1014 # We cannot alter between m2m and concrete fields
1015 self._generate_removed_field(package_label, model_name, field_name)
1016 self._generate_added_field(package_label, model_name, field_name)
1017
1018 def create_altered_indexes(self) -> None:
1019 option_name = operations.AddIndex.option_name
1020
1021 for package_label, model_name in sorted(self.kept_model_keys):
1022 old_model_name = self.renamed_models.get(
1023 (package_label, model_name), model_name
1024 )
1025 old_model_state = self.from_state.models[package_label, old_model_name]
1026 new_model_state = self.to_state.models[package_label, model_name]
1027
1028 old_indexes = old_model_state.options[option_name]
1029 new_indexes = new_model_state.options[option_name]
1030 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1031 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1032 renamed_indexes = []
1033 # Find renamed indexes.
1034 remove_from_added = []
1035 remove_from_removed = []
1036 for new_index in added_indexes:
1037 new_index_dec = new_index.deconstruct()
1038 new_index_name = new_index_dec[2].pop("name")
1039 for old_index in removed_indexes:
1040 old_index_dec = old_index.deconstruct()
1041 old_index_name = old_index_dec[2].pop("name")
1042 # Indexes are the same except for the names.
1043 if (
1044 new_index_dec == old_index_dec
1045 and new_index_name != old_index_name
1046 ):
1047 renamed_indexes.append((old_index_name, new_index_name, None))
1048 remove_from_added.append(new_index)
1049 remove_from_removed.append(old_index)
1050
1051 # Remove renamed indexes from the lists of added and removed
1052 # indexes.
1053 added_indexes = [
1054 idx for idx in added_indexes if idx not in remove_from_added
1055 ]
1056 removed_indexes = [
1057 idx for idx in removed_indexes if idx not in remove_from_removed
1058 ]
1059
1060 self.altered_indexes.update(
1061 {
1062 (package_label, model_name): {
1063 "added_indexes": added_indexes,
1064 "removed_indexes": removed_indexes,
1065 "renamed_indexes": renamed_indexes,
1066 }
1067 }
1068 )
1069
1070 def generate_added_indexes(self) -> None:
1071 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1072 dependencies = self._get_dependencies_for_model(package_label, model_name)
1073 for index in alt_indexes["added_indexes"]:
1074 self.add_operation(
1075 package_label,
1076 operations.AddIndex(
1077 model_name=model_name,
1078 index=index,
1079 ),
1080 dependencies=dependencies,
1081 )
1082
1083 def generate_removed_indexes(self) -> None:
1084 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1085 for index in alt_indexes["removed_indexes"]:
1086 self.add_operation(
1087 package_label,
1088 operations.RemoveIndex(
1089 model_name=model_name,
1090 name=index.name,
1091 ),
1092 )
1093
1094 def generate_renamed_indexes(self) -> None:
1095 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1096 for old_index_name, new_index_name, old_fields in alt_indexes[
1097 "renamed_indexes"
1098 ]:
1099 self.add_operation(
1100 package_label,
1101 operations.RenameIndex(
1102 model_name=model_name,
1103 new_name=new_index_name,
1104 old_name=old_index_name,
1105 old_fields=old_fields,
1106 ),
1107 )
1108
1109 def create_altered_constraints(self) -> None:
1110 option_name = operations.AddConstraint.option_name
1111 for package_label, model_name in sorted(self.kept_model_keys):
1112 old_model_name = self.renamed_models.get(
1113 (package_label, model_name), model_name
1114 )
1115 old_model_state = self.from_state.models[package_label, old_model_name]
1116 new_model_state = self.to_state.models[package_label, model_name]
1117
1118 old_constraints = old_model_state.options[option_name]
1119 new_constraints = new_model_state.options[option_name]
1120 add_constraints = [c for c in new_constraints if c not in old_constraints]
1121 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1122
1123 self.altered_constraints.update(
1124 {
1125 (package_label, model_name): {
1126 "added_constraints": add_constraints,
1127 "removed_constraints": rem_constraints,
1128 }
1129 }
1130 )
1131
1132 def generate_added_constraints(self) -> None:
1133 for (
1134 package_label,
1135 model_name,
1136 ), alt_constraints in self.altered_constraints.items():
1137 dependencies = self._get_dependencies_for_model(package_label, model_name)
1138 for constraint in alt_constraints["added_constraints"]:
1139 self.add_operation(
1140 package_label,
1141 operations.AddConstraint(
1142 model_name=model_name,
1143 constraint=constraint,
1144 ),
1145 dependencies=dependencies,
1146 )
1147
1148 def generate_removed_constraints(self) -> None:
1149 for (
1150 package_label,
1151 model_name,
1152 ), alt_constraints in self.altered_constraints.items():
1153 for constraint in alt_constraints["removed_constraints"]:
1154 self.add_operation(
1155 package_label,
1156 operations.RemoveConstraint(
1157 model_name=model_name,
1158 name=constraint.name,
1159 ),
1160 )
1161
1162 @staticmethod
1163 def _get_dependencies_for_foreign_key(
1164 package_label: str, model_name: str, field: Field, project_state: ProjectState
1165 ) -> list[tuple[str, str, str | None, bool | str]]:
1166 remote_field_model = None
1167 if isinstance(field, RelatedField):
1168 remote_field_model = field.remote_field.model
1169 else:
1170 relations = project_state.relations[package_label, model_name]
1171 for (remote_package_label, remote_model_name), fields in relations.items():
1172 if any(
1173 field == related_field.remote_field
1174 for related_field in fields.values()
1175 if isinstance(related_field, RelatedField)
1176 ):
1177 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1178 break
1179 dep_package_label, dep_object_name = resolve_relation(
1180 remote_field_model,
1181 package_label,
1182 model_name,
1183 )
1184 dependencies = [(dep_package_label, dep_object_name, None, True)]
1185 if isinstance(field, RelatedField) and isinstance(
1186 field.remote_field, ManyToManyRel
1187 ):
1188 through_package_label, through_object_name = resolve_relation(
1189 field.remote_field.through,
1190 package_label,
1191 model_name,
1192 )
1193 dependencies.append(
1194 (through_package_label, through_object_name, None, True)
1195 )
1196 return dependencies
1197
1198 def _get_dependencies_for_model(
1199 self, package_label: str, model_name: str
1200 ) -> list[tuple[str, str, str | None, bool | str]]:
1201 """Return foreign key dependencies of the given model."""
1202 dependencies = []
1203 model_state = self.to_state.models[package_label, model_name]
1204 for field in model_state.fields.values():
1205 if isinstance(field, RelatedField):
1206 dependencies.extend(
1207 self._get_dependencies_for_foreign_key(
1208 package_label,
1209 model_name,
1210 field,
1211 self.to_state,
1212 )
1213 )
1214 return dependencies
1215
1216 def generate_altered_db_table(self) -> None:
1217 for package_label, model_name in sorted(self.kept_model_keys):
1218 old_model_name = self.renamed_models.get(
1219 (package_label, model_name), model_name
1220 )
1221 old_model_state = self.from_state.models[package_label, old_model_name]
1222 new_model_state = self.to_state.models[package_label, model_name]
1223 old_db_table_name = old_model_state.options.get("db_table")
1224 new_db_table_name = new_model_state.options.get("db_table")
1225 if old_db_table_name != new_db_table_name:
1226 self.add_operation(
1227 package_label,
1228 operations.AlterModelTable(
1229 name=model_name,
1230 table=new_db_table_name,
1231 ),
1232 )
1233
1234 def generate_altered_options(self) -> None:
1235 """
1236 Work out if any non-schema-affecting options have changed and make an
1237 operation to represent them in state changes (in case Python code in
1238 migrations needs them).
1239 """
1240 for package_label, model_name in sorted(self.kept_model_keys):
1241 old_model_name = self.renamed_models.get(
1242 (package_label, model_name), model_name
1243 )
1244 old_model_state = self.from_state.models[package_label, old_model_name]
1245 new_model_state = self.to_state.models[package_label, model_name]
1246 old_options = {
1247 key: value
1248 for key, value in old_model_state.options.items()
1249 if key in AlterModelOptions.ALTER_OPTION_KEYS
1250 }
1251 new_options = {
1252 key: value
1253 for key, value in new_model_state.options.items()
1254 if key in AlterModelOptions.ALTER_OPTION_KEYS
1255 }
1256 if old_options != new_options:
1257 self.add_operation(
1258 package_label,
1259 operations.AlterModelOptions(
1260 name=model_name,
1261 options=new_options,
1262 ),
1263 )
1264
1265 def arrange_for_graph(
1266 self,
1267 changes: dict[str, list[Migration]],
1268 graph: MigrationGraph,
1269 migration_name: str | None = None,
1270 ) -> dict[str, list[Migration]]:
1271 """
1272 Take a result from changes() and a MigrationGraph, and fix the names
1273 and dependencies of the changes so they extend the graph from the leaf
1274 nodes for each app.
1275 """
1276 leaves = graph.leaf_nodes()
1277 name_map = {}
1278 for package_label, migrations in list(changes.items()):
1279 if not migrations:
1280 continue
1281 # Find the app label's current leaf node
1282 app_leaf = None
1283 for leaf in leaves:
1284 if leaf[0] == package_label:
1285 app_leaf = leaf
1286 break
1287 # Do they want an initial migration for this app?
1288 if app_leaf is None and not self.questioner.ask_initial(package_label):
1289 # They don't.
1290 for migration in migrations:
1291 name_map[(package_label, migration.name)] = (
1292 package_label,
1293 "__first__",
1294 )
1295 del changes[package_label]
1296 continue
1297 # Work out the next number in the sequence
1298 if app_leaf is None:
1299 next_number = 1
1300 else:
1301 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1302 # Name each migration
1303 for i, migration in enumerate(migrations):
1304 if i == 0 and app_leaf:
1305 migration.dependencies.append(app_leaf)
1306 new_name_parts = ["%04i" % next_number] # noqa: UP031
1307 if migration_name:
1308 new_name_parts.append(migration_name)
1309 elif i == 0 and not app_leaf:
1310 new_name_parts.append("initial")
1311 else:
1312 new_name_parts.append(migration.suggest_name()[:100])
1313 new_name = "_".join(new_name_parts)
1314 name_map[(package_label, migration.name)] = (package_label, new_name)
1315 next_number += 1
1316 migration.name = new_name
1317 # Now fix dependencies
1318 for migrations in changes.values():
1319 for migration in migrations:
1320 migration.dependencies = [
1321 name_map.get(d, d) for d in migration.dependencies
1322 ]
1323 return changes
1324
1325 def _trim_to_packages(
1326 self, changes: dict[str, list[Migration]], package_labels: set[str]
1327 ) -> dict[str, list[Migration]]:
1328 """
1329 Take changes from arrange_for_graph() and set of app labels, and return
1330 a modified set of changes which trims out as many migrations that are
1331 not in package_labels as possible. Note that some other migrations may
1332 still be present as they may be required dependencies.
1333 """
1334 # Gather other app dependencies in a first pass
1335 app_dependencies = {}
1336 for package_label, migrations in changes.items():
1337 for migration in migrations:
1338 for dep_package_label, name in migration.dependencies:
1339 app_dependencies.setdefault(package_label, set()).add(
1340 dep_package_label
1341 )
1342 required_packages = set(package_labels)
1343 # Keep resolving till there's no change
1344 old_required_packages = None
1345 while old_required_packages != required_packages:
1346 old_required_packages = set(required_packages)
1347 required_packages.update(
1348 *[
1349 app_dependencies.get(package_label, ())
1350 for package_label in required_packages
1351 ]
1352 )
1353 # Remove all migrations that aren't needed
1354 for package_label in list(changes):
1355 if package_label not in required_packages:
1356 del changes[package_label]
1357 return changes
1358
1359 @classmethod
1360 def parse_number(cls, name: str) -> int | None:
1361 """
1362 Given a migration name, try to extract a number from the beginning of
1363 it. For a squashed migration such as '0001_squashed_0004…', return the
1364 second number. If no number is found, return None.
1365 """
1366 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1367 return int(squashed_match[1])
1368 match = re.match(r"^\d+", name)
1369 if match:
1370 return int(match[0])
1371 return None