1import functools
2import re
3from graphlib import TopologicalSorter
4
5from plain import models
6from plain.models.migrations import operations
7from plain.models.migrations.migration import Migration, SettingsTuple
8from plain.models.migrations.operations.models import AlterModelOptions
9from plain.models.migrations.optimizer import MigrationOptimizer
10from plain.models.migrations.questioner import MigrationQuestioner
11from plain.models.migrations.utils import (
12 COMPILED_REGEX_TYPE,
13 RegexObject,
14 resolve_relation,
15)
16from plain.runtime import settings
17
18
19class MigrationAutodetector:
20 """
21 Take a pair of ProjectStates and compare them to see what the first would
22 need doing to make it match the second (the second usually being the
23 project's current state).
24
25 Note that this naturally operates on entire projects at a time,
26 as it's likely that changes interact (for example, you can't
27 add a ForeignKey without having a migration to add the table it
28 depends on first). A user interface may offer single-app usage
29 if it wishes, with the caveat that it may not always be possible.
30 """
31
32 def __init__(self, from_state, to_state, questioner=None):
33 self.from_state = from_state
34 self.to_state = to_state
35 self.questioner = questioner or MigrationQuestioner()
36 self.existing_packages = {app for app, model in from_state.models}
37
38 def changes(
39 self, graph, trim_to_packages=None, convert_packages=None, migration_name=None
40 ):
41 """
42 Main entry point to produce a list of applicable changes.
43 Take a graph to base names on and an optional set of packages
44 to try and restrict to (restriction is not guaranteed)
45 """
46 changes = self._detect_changes(convert_packages, graph)
47 changes = self.arrange_for_graph(changes, graph, migration_name)
48 if trim_to_packages:
49 changes = self._trim_to_packages(changes, trim_to_packages)
50 return changes
51
52 def deep_deconstruct(self, obj):
53 """
54 Recursive deconstruction for a field and its arguments.
55 Used for full comparison for rename/alter; sometimes a single-level
56 deconstruction will not compare correctly.
57 """
58 if isinstance(obj, list):
59 return [self.deep_deconstruct(value) for value in obj]
60 elif isinstance(obj, tuple):
61 return tuple(self.deep_deconstruct(value) for value in obj)
62 elif isinstance(obj, dict):
63 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
64 elif isinstance(obj, functools.partial):
65 return (
66 obj.func,
67 self.deep_deconstruct(obj.args),
68 self.deep_deconstruct(obj.keywords),
69 )
70 elif isinstance(obj, COMPILED_REGEX_TYPE):
71 return RegexObject(obj)
72 elif isinstance(obj, type):
73 # If this is a type that implements 'deconstruct' as an instance method,
74 # avoid treating this as being deconstructible itself - see #22951
75 return obj
76 elif hasattr(obj, "deconstruct"):
77 deconstructed = obj.deconstruct()
78 if isinstance(obj, models.Field):
79 # we have a field which also returns a name
80 deconstructed = deconstructed[1:]
81 path, args, kwargs = deconstructed
82 return (
83 path,
84 [self.deep_deconstruct(value) for value in args],
85 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
86 )
87 else:
88 return obj
89
90 def only_relation_agnostic_fields(self, fields):
91 """
92 Return a definition of the fields that ignores field names and
93 what related fields actually relate to. Used for detecting renames (as
94 the related fields change during renames).
95 """
96 fields_def = []
97 for name, field in sorted(fields.items()):
98 deconstruction = self.deep_deconstruct(field)
99 if field.remote_field and field.remote_field.model:
100 deconstruction[2].pop("to", None)
101 fields_def.append(deconstruction)
102 return fields_def
103
104 def _detect_changes(self, convert_packages=None, graph=None):
105 """
106 Return a dict of migration plans which will achieve the
107 change from from_state to to_state. The dict has app labels
108 as keys and a list of migrations as values.
109
110 The resulting migrations aren't specially named, but the names
111 do matter for dependencies inside the set.
112
113 convert_packages is the list of packages to convert to use migrations
114 (i.e. to make initial migrations for, in the usual case)
115
116 graph is an optional argument that, if provided, can help improve
117 dependency generation and avoid potential circular dependencies.
118 """
119 # The first phase is generating all the operations for each app
120 # and gathering them into a big per-app list.
121 # Then go through that list, order it, and split into migrations to
122 # resolve dependencies caused by M2Ms and FKs.
123 self.generated_operations = {}
124 self.altered_indexes = {}
125 self.altered_constraints = {}
126 self.renamed_fields = {}
127
128 # Prepare some old/new state and model lists, ignoring unmigrated packages.
129 self.old_model_keys = set()
130 self.new_model_keys = set()
131 for (package_label, model_name), model_state in self.from_state.models.items():
132 if package_label not in self.from_state.real_packages:
133 self.old_model_keys.add((package_label, model_name))
134
135 for (package_label, model_name), model_state in self.to_state.models.items():
136 if package_label not in self.from_state.real_packages or (
137 convert_packages and package_label in convert_packages
138 ):
139 self.new_model_keys.add((package_label, model_name))
140
141 self.from_state.resolve_fields_and_relations()
142 self.to_state.resolve_fields_and_relations()
143
144 # Renames have to come first
145 self.generate_renamed_models()
146
147 # Prepare lists of fields and generate through model map
148 self._prepare_field_lists()
149 self._generate_through_model_map()
150
151 # Generate non-rename model operations
152 self.generate_deleted_models()
153 self.generate_created_models()
154 self.generate_altered_options()
155 self.generate_altered_managers()
156 self.generate_altered_db_table_comment()
157
158 # Create the renamed fields and store them in self.renamed_fields.
159 # They are used by create_altered_indexes(), generate_altered_fields(),
160 # generate_removed_altered_index(), and
161 # generate_altered_index().
162 self.create_renamed_fields()
163 # Create the altered indexes and store them in self.altered_indexes.
164 # This avoids the same computation in generate_removed_indexes()
165 # and generate_added_indexes().
166 self.create_altered_indexes()
167 self.create_altered_constraints()
168 # Generate index removal operations before field is removed
169 self.generate_removed_constraints()
170 self.generate_removed_indexes()
171 # Generate field renaming operations.
172 self.generate_renamed_fields()
173 self.generate_renamed_indexes()
174 # Generate field operations.
175 self.generate_removed_fields()
176 self.generate_added_fields()
177 self.generate_altered_fields()
178 self.generate_added_indexes()
179 self.generate_added_constraints()
180 self.generate_altered_db_table()
181
182 self._sort_migrations()
183 self._build_migration_list(graph)
184 self._optimize_migrations()
185
186 return self.migrations
187
188 def _prepare_field_lists(self):
189 """
190 Prepare field lists and a list of the fields that used through models
191 in the old state so dependencies can be made from the through model
192 deletion to the field that uses it.
193 """
194 self.kept_model_keys = self.old_model_keys & self.new_model_keys
195 self.through_users = {}
196 self.old_field_keys = {
197 (package_label, model_name, field_name)
198 for package_label, model_name in self.kept_model_keys
199 for field_name in self.from_state.models[
200 package_label,
201 self.renamed_models.get((package_label, model_name), model_name),
202 ].fields
203 }
204 self.new_field_keys = {
205 (package_label, model_name, field_name)
206 for package_label, model_name in self.kept_model_keys
207 for field_name in self.to_state.models[package_label, model_name].fields
208 }
209
210 def _generate_through_model_map(self):
211 """Through model map generation."""
212 for package_label, model_name in sorted(self.old_model_keys):
213 old_model_name = self.renamed_models.get(
214 (package_label, model_name), model_name
215 )
216 old_model_state = self.from_state.models[package_label, old_model_name]
217 for field_name, field in old_model_state.fields.items():
218 if hasattr(field, "remote_field") and getattr(
219 field.remote_field, "through", None
220 ):
221 through_key = resolve_relation(
222 field.remote_field.through, package_label, model_name
223 )
224 self.through_users[through_key] = (
225 package_label,
226 old_model_name,
227 field_name,
228 )
229
230 @staticmethod
231 def _resolve_dependency(dependency):
232 """
233 Return the resolved dependency and a boolean denoting whether or not
234 it was a settings dependency.
235 """
236 if not isinstance(dependency, SettingsTuple):
237 return dependency, False
238 resolved_package_label, resolved_object_name = getattr(
239 settings, dependency[1]
240 ).split(".")
241 return (resolved_package_label, resolved_object_name.lower()) + dependency[
242 2:
243 ], True
244
245 def _build_migration_list(self, graph=None):
246 """
247 Chop the lists of operations up into migrations with dependencies on
248 each other. Do this by going through an app's list of operations until
249 one is found that has an outgoing dependency that isn't in another
250 app's migration yet (hasn't been chopped off its list). Then chop off
251 the operations before it into a migration and move onto the next app.
252 If the loops completes without doing anything, there's a circular
253 dependency (which _should_ be impossible as the operations are
254 all split at this point so they can't depend and be depended on).
255 """
256 self.migrations = {}
257 num_ops = sum(len(x) for x in self.generated_operations.values())
258 chop_mode = False
259 while num_ops:
260 # On every iteration, we step through all the packages and see if there
261 # is a completed set of operations.
262 # If we find that a subset of the operations are complete we can
263 # try to chop it off from the rest and continue, but we only
264 # do this if we've already been through the list once before
265 # without any chopping and nothing has changed.
266 for package_label in sorted(self.generated_operations):
267 chopped = []
268 dependencies = set()
269 for operation in list(self.generated_operations[package_label]):
270 deps_satisfied = True
271 operation_dependencies = set()
272 for dep in operation._auto_deps:
273 # Temporarily resolve the settings dependency to
274 # prevent circular references. While keeping the
275 # dependency checks on the resolved model, add the
276 # settings dependencies.
277 original_dep = dep
278 dep, is_settings_dep = self._resolve_dependency(dep)
279 if dep[0] != package_label:
280 # External app dependency. See if it's not yet
281 # satisfied.
282 for other_operation in self.generated_operations.get(
283 dep[0], []
284 ):
285 if self.check_dependency(other_operation, dep):
286 deps_satisfied = False
287 break
288 if not deps_satisfied:
289 break
290 else:
291 if is_settings_dep:
292 operation_dependencies.add(
293 (original_dep[0], original_dep[1])
294 )
295 elif dep[0] in self.migrations:
296 operation_dependencies.add(
297 (dep[0], self.migrations[dep[0]][-1].name)
298 )
299 else:
300 # If we can't find the other app, we add a
301 # first/last dependency, but only if we've
302 # already been through once and checked
303 # everything.
304 if chop_mode:
305 # If the app already exists, we add a
306 # dependency on the last migration, as
307 # we don't know which migration
308 # contains the target field. If it's
309 # not yet migrated or has no
310 # migrations, we use __first__.
311 if graph and graph.leaf_nodes(dep[0]):
312 operation_dependencies.add(
313 graph.leaf_nodes(dep[0])[0]
314 )
315 else:
316 operation_dependencies.add(
317 (dep[0], "__first__")
318 )
319 else:
320 deps_satisfied = False
321 if deps_satisfied:
322 chopped.append(operation)
323 dependencies.update(operation_dependencies)
324 del self.generated_operations[package_label][0]
325 else:
326 break
327 # Make a migration! Well, only if there's stuff to put in it
328 if dependencies or chopped:
329 if not self.generated_operations[package_label] or chop_mode:
330 subclass = type(
331 "Migration",
332 (Migration,),
333 {"operations": [], "dependencies": []},
334 )
335 instance = subclass(
336 "auto_%i" # noqa: UP031
337 % (len(self.migrations.get(package_label, [])) + 1),
338 package_label,
339 )
340 instance.dependencies = list(dependencies)
341 instance.operations = chopped
342 instance.initial = package_label not in self.existing_packages
343 self.migrations.setdefault(package_label, []).append(instance)
344 chop_mode = False
345 else:
346 self.generated_operations[package_label] = (
347 chopped + self.generated_operations[package_label]
348 )
349 new_num_ops = sum(len(x) for x in self.generated_operations.values())
350 if new_num_ops == num_ops:
351 if not chop_mode:
352 chop_mode = True
353 else:
354 raise ValueError(
355 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
356 )
357 num_ops = new_num_ops
358
359 def _sort_migrations(self):
360 """
361 Reorder to make things possible. Reordering may be needed so FKs work
362 nicely inside the same app.
363 """
364 for package_label, ops in sorted(self.generated_operations.items()):
365 ts = TopologicalSorter()
366 for op in ops:
367 ts.add(op)
368 for dep in op._auto_deps:
369 # Resolve intra-app dependencies to handle circular
370 # references involving a settings model.
371 dep = self._resolve_dependency(dep)[0]
372 if dep[0] != package_label:
373 continue
374 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
375 self.generated_operations[package_label] = list(ts.static_order())
376
377 def _optimize_migrations(self):
378 # Add in internal dependencies among the migrations
379 for package_label, migrations in self.migrations.items():
380 for m1, m2 in zip(migrations, migrations[1:]):
381 m2.dependencies.append((package_label, m1.name))
382
383 # De-dupe dependencies
384 for migrations in self.migrations.values():
385 for migration in migrations:
386 migration.dependencies = list(set(migration.dependencies))
387
388 # Optimize migrations
389 for package_label, migrations in self.migrations.items():
390 for migration in migrations:
391 migration.operations = MigrationOptimizer().optimize(
392 migration.operations, package_label
393 )
394
395 def check_dependency(self, operation, dependency):
396 """
397 Return True if the given operation depends on the given dependency,
398 False otherwise.
399 """
400 # Created model
401 if dependency[2] is None and dependency[3] is True:
402 return (
403 isinstance(operation, operations.CreateModel)
404 and operation.name_lower == dependency[1].lower()
405 )
406 # Created field
407 elif dependency[2] is not None and dependency[3] is True:
408 return (
409 isinstance(operation, operations.CreateModel)
410 and operation.name_lower == dependency[1].lower()
411 and any(dependency[2] == x for x, y in operation.fields)
412 ) or (
413 isinstance(operation, operations.AddField)
414 and operation.model_name_lower == dependency[1].lower()
415 and operation.name_lower == dependency[2].lower()
416 )
417 # Removed field
418 elif dependency[2] is not None and dependency[3] is False:
419 return (
420 isinstance(operation, operations.RemoveField)
421 and operation.model_name_lower == dependency[1].lower()
422 and operation.name_lower == dependency[2].lower()
423 )
424 # Removed model
425 elif dependency[2] is None and dependency[3] is False:
426 return (
427 isinstance(operation, operations.DeleteModel)
428 and operation.name_lower == dependency[1].lower()
429 )
430 # Field being altered
431 elif dependency[2] is not None and dependency[3] == "alter":
432 return (
433 isinstance(operation, operations.AlterField)
434 and operation.model_name_lower == dependency[1].lower()
435 and operation.name_lower == dependency[2].lower()
436 )
437 # Unknown dependency. Raise an error.
438 else:
439 raise ValueError(f"Can't handle dependency {dependency!r}")
440
441 def add_operation(
442 self, package_label, operation, dependencies=None, beginning=False
443 ):
444 # Dependencies are
445 # (package_label, model_name, field_name, create/delete as True/False)
446 operation._auto_deps = dependencies or []
447 if beginning:
448 self.generated_operations.setdefault(package_label, []).insert(0, operation)
449 else:
450 self.generated_operations.setdefault(package_label, []).append(operation)
451
452 def generate_renamed_models(self):
453 """
454 Find any renamed models, generate the operations for them, and remove
455 the old entry from the model lists. Must be run before other
456 model-level generation.
457 """
458 self.renamed_models = {}
459 self.renamed_models_rel = {}
460 added_models = self.new_model_keys - self.old_model_keys
461 for package_label, model_name in sorted(added_models):
462 model_state = self.to_state.models[package_label, model_name]
463 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
464
465 removed_models = self.old_model_keys - self.new_model_keys
466 for rem_package_label, rem_model_name in removed_models:
467 if rem_package_label == package_label:
468 rem_model_state = self.from_state.models[
469 rem_package_label, rem_model_name
470 ]
471 rem_model_fields_def = self.only_relation_agnostic_fields(
472 rem_model_state.fields
473 )
474 if model_fields_def == rem_model_fields_def:
475 if self.questioner.ask_rename_model(
476 rem_model_state, model_state
477 ):
478 dependencies = []
479 fields = list(model_state.fields.values()) + [
480 field.remote_field
481 for relations in self.to_state.relations[
482 package_label, model_name
483 ].values()
484 for field in relations.values()
485 ]
486 for field in fields:
487 if field.is_relation:
488 dependencies.extend(
489 self._get_dependencies_for_foreign_key(
490 package_label,
491 model_name,
492 field,
493 self.to_state,
494 )
495 )
496 self.add_operation(
497 package_label,
498 operations.RenameModel(
499 old_name=rem_model_state.name,
500 new_name=model_state.name,
501 ),
502 dependencies=dependencies,
503 )
504 self.renamed_models[package_label, model_name] = (
505 rem_model_name
506 )
507 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
508 self.renamed_models_rel[renamed_models_rel_key] = (
509 f"{model_state.package_label}.{model_state.name_lower}"
510 )
511 self.old_model_keys.remove(
512 (rem_package_label, rem_model_name)
513 )
514 self.old_model_keys.add((package_label, model_name))
515 break
516
517 def generate_created_models(self):
518 """
519 Find all new models and make create
520 operations for them as well as separate operations to create any
521 foreign key or M2M relationships (these are optimized later, if
522 possible).
523
524 Defer any model options that refer to collections of fields that might
525 be deferred.
526 """
527 added_models = self.new_model_keys - self.old_model_keys
528
529 for package_label, model_name in added_models:
530 model_state = self.to_state.models[package_label, model_name]
531 # Gather related fields
532 related_fields = {}
533 primary_key_rel = None
534 for field_name, field in model_state.fields.items():
535 if field.remote_field:
536 if field.remote_field.model:
537 if field.primary_key:
538 primary_key_rel = field.remote_field.model
539 elif not field.remote_field.parent_link:
540 related_fields[field_name] = field
541 if getattr(field.remote_field, "through", None):
542 related_fields[field_name] = field
543
544 # Are there indexes to defer?
545 indexes = model_state.options.pop("indexes")
546 constraints = model_state.options.pop("constraints")
547 # Depend on the deletion of any possible proxy version of us
548 dependencies = [
549 (package_label, model_name, None, False),
550 ]
551 # Depend on all bases
552 for base in model_state.bases:
553 if isinstance(base, str) and "." in base:
554 base_package_label, base_name = base.split(".", 1)
555 dependencies.append((base_package_label, base_name, None, True))
556 # Depend on the removal of base fields if the new model has
557 # a field with the same name.
558 old_base_model_state = self.from_state.models.get(
559 (base_package_label, base_name)
560 )
561 new_base_model_state = self.to_state.models.get(
562 (base_package_label, base_name)
563 )
564 if old_base_model_state and new_base_model_state:
565 removed_base_fields = (
566 set(old_base_model_state.fields)
567 .difference(
568 new_base_model_state.fields,
569 )
570 .intersection(model_state.fields)
571 )
572 for removed_base_field in removed_base_fields:
573 dependencies.append(
574 (
575 base_package_label,
576 base_name,
577 removed_base_field,
578 False,
579 )
580 )
581 # Depend on the other end of the primary key if it's a relation
582 if primary_key_rel:
583 dependencies.append(
584 resolve_relation(
585 primary_key_rel,
586 package_label,
587 model_name,
588 )
589 + (None, True)
590 )
591 # Generate creation operation
592 self.add_operation(
593 package_label,
594 operations.CreateModel(
595 name=model_state.name,
596 fields=[
597 d
598 for d in model_state.fields.items()
599 if d[0] not in related_fields
600 ],
601 options=model_state.options,
602 bases=model_state.bases,
603 managers=model_state.managers,
604 ),
605 dependencies=dependencies,
606 beginning=True,
607 )
608
609 # Generate operations for each related field
610 for name, field in sorted(related_fields.items()):
611 dependencies = self._get_dependencies_for_foreign_key(
612 package_label,
613 model_name,
614 field,
615 self.to_state,
616 )
617 # Depend on our own model being created
618 dependencies.append((package_label, model_name, None, True))
619 # Make operation
620 self.add_operation(
621 package_label,
622 operations.AddField(
623 model_name=model_name,
624 name=name,
625 field=field,
626 ),
627 dependencies=list(set(dependencies)),
628 )
629
630 related_dependencies = [
631 (package_label, model_name, name, True)
632 for name in sorted(related_fields)
633 ]
634 related_dependencies.append((package_label, model_name, None, True))
635 for index in indexes:
636 self.add_operation(
637 package_label,
638 operations.AddIndex(
639 model_name=model_name,
640 index=index,
641 ),
642 dependencies=related_dependencies,
643 )
644 for constraint in constraints:
645 self.add_operation(
646 package_label,
647 operations.AddConstraint(
648 model_name=model_name,
649 constraint=constraint,
650 ),
651 dependencies=related_dependencies,
652 )
653
654 def generate_deleted_models(self):
655 """
656 Find all deleted models and make delete
657 operations for them as well as separate operations to delete any
658 foreign key or M2M relationships (these are optimized later, if
659 possible).
660
661 Also bring forward removal of any model options that refer to
662 collections of fields - the inverse of generate_created_models().
663 """
664 deleted_models = self.old_model_keys - self.new_model_keys
665
666 for package_label, model_name in sorted(deleted_models):
667 model_state = self.from_state.models[package_label, model_name]
668 # Gather related fields
669 related_fields = {}
670 for field_name, field in model_state.fields.items():
671 if field.remote_field:
672 if field.remote_field.model:
673 related_fields[field_name] = field
674 if getattr(field.remote_field, "through", None):
675 related_fields[field_name] = field
676
677 # Then remove each related field
678 for name in sorted(related_fields):
679 self.add_operation(
680 package_label,
681 operations.RemoveField(
682 model_name=model_name,
683 name=name,
684 ),
685 )
686 # Finally, remove the model.
687 # This depends on both the removal/alteration of all incoming fields
688 # and the removal of all its own related fields, and if it's
689 # a through model the field that references it.
690 dependencies = []
691 relations = self.from_state.relations
692 for (
693 related_object_package_label,
694 object_name,
695 ), relation_related_fields in relations[package_label, model_name].items():
696 for field_name, field in relation_related_fields.items():
697 dependencies.append(
698 (related_object_package_label, object_name, field_name, False),
699 )
700 if not field.many_to_many:
701 dependencies.append(
702 (
703 related_object_package_label,
704 object_name,
705 field_name,
706 "alter",
707 ),
708 )
709
710 for name in sorted(related_fields):
711 dependencies.append((package_label, model_name, name, False))
712 # We're referenced in another field's through=
713 through_user = self.through_users.get(
714 (package_label, model_state.name_lower)
715 )
716 if through_user:
717 dependencies.append(
718 (through_user[0], through_user[1], through_user[2], False)
719 )
720 # Finally, make the operation, deduping any dependencies
721 self.add_operation(
722 package_label,
723 operations.DeleteModel(
724 name=model_state.name,
725 ),
726 dependencies=list(set(dependencies)),
727 )
728
729 def create_renamed_fields(self):
730 """Work out renamed fields."""
731 self.renamed_operations = []
732 old_field_keys = self.old_field_keys.copy()
733 for package_label, model_name, field_name in sorted(
734 self.new_field_keys - old_field_keys
735 ):
736 old_model_name = self.renamed_models.get(
737 (package_label, model_name), model_name
738 )
739 old_model_state = self.from_state.models[package_label, old_model_name]
740 new_model_state = self.to_state.models[package_label, model_name]
741 field = new_model_state.get_field(field_name)
742 # Scan to see if this is actually a rename!
743 field_dec = self.deep_deconstruct(field)
744 for rem_package_label, rem_model_name, rem_field_name in sorted(
745 old_field_keys - self.new_field_keys
746 ):
747 if rem_package_label == package_label and rem_model_name == model_name:
748 old_field = old_model_state.get_field(rem_field_name)
749 old_field_dec = self.deep_deconstruct(old_field)
750 if (
751 field.remote_field
752 and field.remote_field.model
753 and "to" in old_field_dec[2]
754 ):
755 old_rel_to = old_field_dec[2]["to"]
756 if old_rel_to in self.renamed_models_rel:
757 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
758 old_field.set_attributes_from_name(rem_field_name)
759 old_db_column = old_field.get_attname_column()[1]
760 if old_field_dec == field_dec or (
761 # Was the field renamed and db_column equal to the
762 # old field's column added?
763 old_field_dec[0:2] == field_dec[0:2]
764 and dict(old_field_dec[2], db_column=old_db_column)
765 == field_dec[2]
766 ):
767 if self.questioner.ask_rename(
768 model_name, rem_field_name, field_name, field
769 ):
770 self.renamed_operations.append(
771 (
772 rem_package_label,
773 rem_model_name,
774 old_field.db_column,
775 rem_field_name,
776 package_label,
777 model_name,
778 field,
779 field_name,
780 )
781 )
782 old_field_keys.remove(
783 (rem_package_label, rem_model_name, rem_field_name)
784 )
785 old_field_keys.add((package_label, model_name, field_name))
786 self.renamed_fields[
787 package_label, model_name, field_name
788 ] = rem_field_name
789 break
790
791 def generate_renamed_fields(self):
792 """Generate RenameField operations."""
793 for (
794 rem_package_label,
795 rem_model_name,
796 rem_db_column,
797 rem_field_name,
798 package_label,
799 model_name,
800 field,
801 field_name,
802 ) in self.renamed_operations:
803 # A db_column mismatch requires a prior noop AlterField for the
804 # subsequent RenameField to be a noop on attempts at preserving the
805 # old name.
806 if rem_db_column != field.db_column:
807 altered_field = field.clone()
808 altered_field.name = rem_field_name
809 self.add_operation(
810 package_label,
811 operations.AlterField(
812 model_name=model_name,
813 name=rem_field_name,
814 field=altered_field,
815 ),
816 )
817 self.add_operation(
818 package_label,
819 operations.RenameField(
820 model_name=model_name,
821 old_name=rem_field_name,
822 new_name=field_name,
823 ),
824 )
825 self.old_field_keys.remove(
826 (rem_package_label, rem_model_name, rem_field_name)
827 )
828 self.old_field_keys.add((package_label, model_name, field_name))
829
830 def generate_added_fields(self):
831 """Make AddField operations."""
832 for package_label, model_name, field_name in sorted(
833 self.new_field_keys - self.old_field_keys
834 ):
835 self._generate_added_field(package_label, model_name, field_name)
836
837 def _generate_added_field(self, package_label, model_name, field_name):
838 field = self.to_state.models[package_label, model_name].get_field(field_name)
839 # Adding a field always depends at least on its removal.
840 dependencies = [(package_label, model_name, field_name, False)]
841 # Fields that are foreignkeys/m2ms depend on stuff.
842 if field.remote_field and field.remote_field.model:
843 dependencies.extend(
844 self._get_dependencies_for_foreign_key(
845 package_label,
846 model_name,
847 field,
848 self.to_state,
849 )
850 )
851 # You can't just add NOT NULL fields with no default or fields
852 # which don't allow empty strings as default.
853 time_fields = (models.DateField, models.DateTimeField, models.TimeField)
854 preserve_default = (
855 field.allow_null
856 or field.has_default()
857 or field.many_to_many
858 or (not field.required and field.empty_strings_allowed)
859 or (isinstance(field, time_fields) and field.auto_now)
860 )
861 if not preserve_default:
862 field = field.clone()
863 if isinstance(field, time_fields) and field.auto_now_add:
864 field.default = self.questioner.ask_auto_now_add_addition(
865 field_name, model_name
866 )
867 else:
868 field.default = self.questioner.ask_not_null_addition(
869 field_name, model_name
870 )
871 if (
872 field.primary_key
873 and field.default is not models.NOT_PROVIDED
874 and callable(field.default)
875 ):
876 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
877 self.add_operation(
878 package_label,
879 operations.AddField(
880 model_name=model_name,
881 name=field_name,
882 field=field,
883 preserve_default=preserve_default,
884 ),
885 dependencies=dependencies,
886 )
887
888 def generate_removed_fields(self):
889 """Make RemoveField operations."""
890 for package_label, model_name, field_name in sorted(
891 self.old_field_keys - self.new_field_keys
892 ):
893 self._generate_removed_field(package_label, model_name, field_name)
894
895 def _generate_removed_field(self, package_label, model_name, field_name):
896 self.add_operation(
897 package_label,
898 operations.RemoveField(
899 model_name=model_name,
900 name=field_name,
901 ),
902 )
903
904 def generate_altered_fields(self):
905 """
906 Make AlterField operations, or possibly RemovedField/AddField if alter
907 isn't possible.
908 """
909 for package_label, model_name, field_name in sorted(
910 self.old_field_keys & self.new_field_keys
911 ):
912 # Did the field change?
913 old_model_name = self.renamed_models.get(
914 (package_label, model_name), model_name
915 )
916 old_field_name = self.renamed_fields.get(
917 (package_label, model_name, field_name), field_name
918 )
919 old_field = self.from_state.models[package_label, old_model_name].get_field(
920 old_field_name
921 )
922 new_field = self.to_state.models[package_label, model_name].get_field(
923 field_name
924 )
925 dependencies = []
926 # Implement any model renames on relations; these are handled by RenameModel
927 # so we need to exclude them from the comparison
928 if hasattr(new_field, "remote_field") and getattr(
929 new_field.remote_field, "model", None
930 ):
931 rename_key = resolve_relation(
932 new_field.remote_field.model, package_label, model_name
933 )
934 if rename_key in self.renamed_models:
935 new_field.remote_field.model = old_field.remote_field.model
936 # Handle ForeignKey which can only have a single to_field.
937 remote_field_name = getattr(new_field.remote_field, "field_name", None)
938 if remote_field_name:
939 to_field_rename_key = rename_key + (remote_field_name,)
940 if to_field_rename_key in self.renamed_fields:
941 # Repoint both model and field name because to_field
942 # inclusion in ForeignKey.deconstruct() is based on
943 # both.
944 new_field.remote_field.model = old_field.remote_field.model
945 new_field.remote_field.field_name = (
946 old_field.remote_field.field_name
947 )
948 # Handle ForeignObjects which can have multiple from_fields/to_fields.
949 from_fields = getattr(new_field, "from_fields", None)
950 if from_fields:
951 from_rename_key = (package_label, model_name)
952 new_field.from_fields = tuple(
953 [
954 self.renamed_fields.get(
955 from_rename_key + (from_field,), from_field
956 )
957 for from_field in from_fields
958 ]
959 )
960 new_field.to_fields = tuple(
961 [
962 self.renamed_fields.get(rename_key + (to_field,), to_field)
963 for to_field in new_field.to_fields
964 ]
965 )
966 dependencies.extend(
967 self._get_dependencies_for_foreign_key(
968 package_label,
969 model_name,
970 new_field,
971 self.to_state,
972 )
973 )
974 if hasattr(new_field, "remote_field") and getattr(
975 new_field.remote_field, "through", None
976 ):
977 rename_key = resolve_relation(
978 new_field.remote_field.through, package_label, model_name
979 )
980 if rename_key in self.renamed_models:
981 new_field.remote_field.through = old_field.remote_field.through
982 old_field_dec = self.deep_deconstruct(old_field)
983 new_field_dec = self.deep_deconstruct(new_field)
984 # If the field was confirmed to be renamed it means that only
985 # db_column was allowed to change which generate_renamed_fields()
986 # already accounts for by adding an AlterField operation.
987 if old_field_dec != new_field_dec and old_field_name == field_name:
988 both_m2m = old_field.many_to_many and new_field.many_to_many
989 neither_m2m = not old_field.many_to_many and not new_field.many_to_many
990 if both_m2m or neither_m2m:
991 # Either both fields are m2m or neither is
992 preserve_default = True
993 if (
994 old_field.allow_null
995 and not new_field.allow_null
996 and not new_field.has_default()
997 and not new_field.many_to_many
998 ):
999 field = new_field.clone()
1000 new_default = self.questioner.ask_not_null_alteration(
1001 field_name, model_name
1002 )
1003 if new_default is not models.NOT_PROVIDED:
1004 field.default = new_default
1005 preserve_default = False
1006 else:
1007 field = new_field
1008 self.add_operation(
1009 package_label,
1010 operations.AlterField(
1011 model_name=model_name,
1012 name=field_name,
1013 field=field,
1014 preserve_default=preserve_default,
1015 ),
1016 dependencies=dependencies,
1017 )
1018 else:
1019 # We cannot alter between m2m and concrete fields
1020 self._generate_removed_field(package_label, model_name, field_name)
1021 self._generate_added_field(package_label, model_name, field_name)
1022
1023 def create_altered_indexes(self):
1024 option_name = operations.AddIndex.option_name
1025
1026 for package_label, model_name in sorted(self.kept_model_keys):
1027 old_model_name = self.renamed_models.get(
1028 (package_label, model_name), model_name
1029 )
1030 old_model_state = self.from_state.models[package_label, old_model_name]
1031 new_model_state = self.to_state.models[package_label, model_name]
1032
1033 old_indexes = old_model_state.options[option_name]
1034 new_indexes = new_model_state.options[option_name]
1035 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1036 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1037 renamed_indexes = []
1038 # Find renamed indexes.
1039 remove_from_added = []
1040 remove_from_removed = []
1041 for new_index in added_indexes:
1042 new_index_dec = new_index.deconstruct()
1043 new_index_name = new_index_dec[2].pop("name")
1044 for old_index in removed_indexes:
1045 old_index_dec = old_index.deconstruct()
1046 old_index_name = old_index_dec[2].pop("name")
1047 # Indexes are the same except for the names.
1048 if (
1049 new_index_dec == old_index_dec
1050 and new_index_name != old_index_name
1051 ):
1052 renamed_indexes.append((old_index_name, new_index_name, None))
1053 remove_from_added.append(new_index)
1054 remove_from_removed.append(old_index)
1055
1056 # Remove renamed indexes from the lists of added and removed
1057 # indexes.
1058 added_indexes = [
1059 idx for idx in added_indexes if idx not in remove_from_added
1060 ]
1061 removed_indexes = [
1062 idx for idx in removed_indexes if idx not in remove_from_removed
1063 ]
1064
1065 self.altered_indexes.update(
1066 {
1067 (package_label, model_name): {
1068 "added_indexes": added_indexes,
1069 "removed_indexes": removed_indexes,
1070 "renamed_indexes": renamed_indexes,
1071 }
1072 }
1073 )
1074
1075 def generate_added_indexes(self):
1076 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1077 dependencies = self._get_dependencies_for_model(package_label, model_name)
1078 for index in alt_indexes["added_indexes"]:
1079 self.add_operation(
1080 package_label,
1081 operations.AddIndex(
1082 model_name=model_name,
1083 index=index,
1084 ),
1085 dependencies=dependencies,
1086 )
1087
1088 def generate_removed_indexes(self):
1089 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1090 for index in alt_indexes["removed_indexes"]:
1091 self.add_operation(
1092 package_label,
1093 operations.RemoveIndex(
1094 model_name=model_name,
1095 name=index.name,
1096 ),
1097 )
1098
1099 def generate_renamed_indexes(self):
1100 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1101 for old_index_name, new_index_name, old_fields in alt_indexes[
1102 "renamed_indexes"
1103 ]:
1104 self.add_operation(
1105 package_label,
1106 operations.RenameIndex(
1107 model_name=model_name,
1108 new_name=new_index_name,
1109 old_name=old_index_name,
1110 old_fields=old_fields,
1111 ),
1112 )
1113
1114 def create_altered_constraints(self):
1115 option_name = operations.AddConstraint.option_name
1116 for package_label, model_name in sorted(self.kept_model_keys):
1117 old_model_name = self.renamed_models.get(
1118 (package_label, model_name), model_name
1119 )
1120 old_model_state = self.from_state.models[package_label, old_model_name]
1121 new_model_state = self.to_state.models[package_label, model_name]
1122
1123 old_constraints = old_model_state.options[option_name]
1124 new_constraints = new_model_state.options[option_name]
1125 add_constraints = [c for c in new_constraints if c not in old_constraints]
1126 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1127
1128 self.altered_constraints.update(
1129 {
1130 (package_label, model_name): {
1131 "added_constraints": add_constraints,
1132 "removed_constraints": rem_constraints,
1133 }
1134 }
1135 )
1136
1137 def generate_added_constraints(self):
1138 for (
1139 package_label,
1140 model_name,
1141 ), alt_constraints in self.altered_constraints.items():
1142 dependencies = self._get_dependencies_for_model(package_label, model_name)
1143 for constraint in alt_constraints["added_constraints"]:
1144 self.add_operation(
1145 package_label,
1146 operations.AddConstraint(
1147 model_name=model_name,
1148 constraint=constraint,
1149 ),
1150 dependencies=dependencies,
1151 )
1152
1153 def generate_removed_constraints(self):
1154 for (
1155 package_label,
1156 model_name,
1157 ), alt_constraints in self.altered_constraints.items():
1158 for constraint in alt_constraints["removed_constraints"]:
1159 self.add_operation(
1160 package_label,
1161 operations.RemoveConstraint(
1162 model_name=model_name,
1163 name=constraint.name,
1164 ),
1165 )
1166
1167 @staticmethod
1168 def _get_dependencies_for_foreign_key(
1169 package_label, model_name, field, project_state
1170 ):
1171 remote_field_model = None
1172 if hasattr(field.remote_field, "model"):
1173 remote_field_model = field.remote_field.model
1174 else:
1175 relations = project_state.relations[package_label, model_name]
1176 for (remote_package_label, remote_model_name), fields in relations.items():
1177 if any(
1178 field == related_field.remote_field
1179 for related_field in fields.values()
1180 ):
1181 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1182 break
1183 dep_package_label, dep_object_name = resolve_relation(
1184 remote_field_model,
1185 package_label,
1186 model_name,
1187 )
1188 dependencies = [(dep_package_label, dep_object_name, None, True)]
1189 if getattr(field.remote_field, "through", None):
1190 through_package_label, through_object_name = resolve_relation(
1191 field.remote_field.through,
1192 package_label,
1193 model_name,
1194 )
1195 dependencies.append(
1196 (through_package_label, through_object_name, None, True)
1197 )
1198 return dependencies
1199
1200 def _get_dependencies_for_model(self, package_label, model_name):
1201 """Return foreign key dependencies of the given model."""
1202 dependencies = []
1203 model_state = self.to_state.models[package_label, model_name]
1204 for field in model_state.fields.values():
1205 if field.is_relation:
1206 dependencies.extend(
1207 self._get_dependencies_for_foreign_key(
1208 package_label,
1209 model_name,
1210 field,
1211 self.to_state,
1212 )
1213 )
1214 return dependencies
1215
1216 def generate_altered_db_table(self):
1217 for package_label, model_name in sorted(self.kept_model_keys):
1218 old_model_name = self.renamed_models.get(
1219 (package_label, model_name), model_name
1220 )
1221 old_model_state = self.from_state.models[package_label, old_model_name]
1222 new_model_state = self.to_state.models[package_label, model_name]
1223 old_db_table_name = old_model_state.options.get("db_table")
1224 new_db_table_name = new_model_state.options.get("db_table")
1225 if old_db_table_name != new_db_table_name:
1226 self.add_operation(
1227 package_label,
1228 operations.AlterModelTable(
1229 name=model_name,
1230 table=new_db_table_name,
1231 ),
1232 )
1233
1234 def generate_altered_db_table_comment(self):
1235 for package_label, model_name in sorted(self.kept_model_keys):
1236 old_model_name = self.renamed_models.get(
1237 (package_label, model_name), model_name
1238 )
1239 old_model_state = self.from_state.models[package_label, old_model_name]
1240 new_model_state = self.to_state.models[package_label, model_name]
1241
1242 old_db_table_comment = old_model_state.options.get("db_table_comment")
1243 new_db_table_comment = new_model_state.options.get("db_table_comment")
1244 if old_db_table_comment != new_db_table_comment:
1245 self.add_operation(
1246 package_label,
1247 operations.AlterModelTableComment(
1248 name=model_name,
1249 table_comment=new_db_table_comment,
1250 ),
1251 )
1252
1253 def generate_altered_options(self):
1254 """
1255 Work out if any non-schema-affecting options have changed and make an
1256 operation to represent them in state changes (in case Python code in
1257 migrations needs them).
1258 """
1259 for package_label, model_name in sorted(self.kept_model_keys):
1260 old_model_name = self.renamed_models.get(
1261 (package_label, model_name), model_name
1262 )
1263 old_model_state = self.from_state.models[package_label, old_model_name]
1264 new_model_state = self.to_state.models[package_label, model_name]
1265 old_options = {
1266 key: value
1267 for key, value in old_model_state.options.items()
1268 if key in AlterModelOptions.ALTER_OPTION_KEYS
1269 }
1270 new_options = {
1271 key: value
1272 for key, value in new_model_state.options.items()
1273 if key in AlterModelOptions.ALTER_OPTION_KEYS
1274 }
1275 if old_options != new_options:
1276 self.add_operation(
1277 package_label,
1278 operations.AlterModelOptions(
1279 name=model_name,
1280 options=new_options,
1281 ),
1282 )
1283
1284 def generate_altered_managers(self):
1285 for package_label, model_name in sorted(self.kept_model_keys):
1286 old_model_name = self.renamed_models.get(
1287 (package_label, model_name), model_name
1288 )
1289 old_model_state = self.from_state.models[package_label, old_model_name]
1290 new_model_state = self.to_state.models[package_label, model_name]
1291 if old_model_state.managers != new_model_state.managers:
1292 self.add_operation(
1293 package_label,
1294 operations.AlterModelManagers(
1295 name=model_name,
1296 managers=new_model_state.managers,
1297 ),
1298 )
1299
1300 def arrange_for_graph(self, changes, graph, migration_name=None):
1301 """
1302 Take a result from changes() and a MigrationGraph, and fix the names
1303 and dependencies of the changes so they extend the graph from the leaf
1304 nodes for each app.
1305 """
1306 leaves = graph.leaf_nodes()
1307 name_map = {}
1308 for package_label, migrations in list(changes.items()):
1309 if not migrations:
1310 continue
1311 # Find the app label's current leaf node
1312 app_leaf = None
1313 for leaf in leaves:
1314 if leaf[0] == package_label:
1315 app_leaf = leaf
1316 break
1317 # Do they want an initial migration for this app?
1318 if app_leaf is None and not self.questioner.ask_initial(package_label):
1319 # They don't.
1320 for migration in migrations:
1321 name_map[(package_label, migration.name)] = (
1322 package_label,
1323 "__first__",
1324 )
1325 del changes[package_label]
1326 continue
1327 # Work out the next number in the sequence
1328 if app_leaf is None:
1329 next_number = 1
1330 else:
1331 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1332 # Name each migration
1333 for i, migration in enumerate(migrations):
1334 if i == 0 and app_leaf:
1335 migration.dependencies.append(app_leaf)
1336 new_name_parts = ["%04i" % next_number] # noqa: UP031
1337 if migration_name:
1338 new_name_parts.append(migration_name)
1339 elif i == 0 and not app_leaf:
1340 new_name_parts.append("initial")
1341 else:
1342 new_name_parts.append(migration.suggest_name()[:100])
1343 new_name = "_".join(new_name_parts)
1344 name_map[(package_label, migration.name)] = (package_label, new_name)
1345 next_number += 1
1346 migration.name = new_name
1347 # Now fix dependencies
1348 for migrations in changes.values():
1349 for migration in migrations:
1350 migration.dependencies = [
1351 name_map.get(d, d) for d in migration.dependencies
1352 ]
1353 return changes
1354
1355 def _trim_to_packages(self, changes, package_labels):
1356 """
1357 Take changes from arrange_for_graph() and set of app labels, and return
1358 a modified set of changes which trims out as many migrations that are
1359 not in package_labels as possible. Note that some other migrations may
1360 still be present as they may be required dependencies.
1361 """
1362 # Gather other app dependencies in a first pass
1363 app_dependencies = {}
1364 for package_label, migrations in changes.items():
1365 for migration in migrations:
1366 for dep_package_label, name in migration.dependencies:
1367 app_dependencies.setdefault(package_label, set()).add(
1368 dep_package_label
1369 )
1370 required_packages = set(package_labels)
1371 # Keep resolving till there's no change
1372 old_required_packages = None
1373 while old_required_packages != required_packages:
1374 old_required_packages = set(required_packages)
1375 required_packages.update(
1376 *[
1377 app_dependencies.get(package_label, ())
1378 for package_label in required_packages
1379 ]
1380 )
1381 # Remove all migrations that aren't needed
1382 for package_label in list(changes):
1383 if package_label not in required_packages:
1384 del changes[package_label]
1385 return changes
1386
1387 @classmethod
1388 def parse_number(cls, name):
1389 """
1390 Given a migration name, try to extract a number from the beginning of
1391 it. For a squashed migration such as '0001_squashed_0004…', return the
1392 second number. If no number is found, return None.
1393 """
1394 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1395 return int(squashed_match[1])
1396 match = re.match(r"^\d+", name)
1397 if match:
1398 return int(match[0])
1399 return None