1import functools
2import re
3from graphlib import TopologicalSorter
4
5from plain import models
6from plain.models.migrations import operations
7from plain.models.migrations.migration import Migration, SettingsTuple
8from plain.models.migrations.operations.models import AlterModelOptions
9from plain.models.migrations.optimizer import MigrationOptimizer
10from plain.models.migrations.questioner import MigrationQuestioner
11from plain.models.migrations.utils import (
12 COMPILED_REGEX_TYPE,
13 RegexObject,
14 resolve_relation,
15)
16from plain.runtime import settings
17
18
19class MigrationAutodetector:
20 """
21 Take a pair of ProjectStates and compare them to see what the first would
22 need doing to make it match the second (the second usually being the
23 project's current state).
24
25 Note that this naturally operates on entire projects at a time,
26 as it's likely that changes interact (for example, you can't
27 add a ForeignKey without having a migration to add the table it
28 depends on first). A user interface may offer single-app usage
29 if it wishes, with the caveat that it may not always be possible.
30 """
31
32 def __init__(self, from_state, to_state, questioner=None):
33 self.from_state = from_state
34 self.to_state = to_state
35 self.questioner = questioner or MigrationQuestioner()
36 self.existing_packages = {app for app, model in from_state.models}
37
38 def changes(
39 self, graph, trim_to_packages=None, convert_packages=None, migration_name=None
40 ):
41 """
42 Main entry point to produce a list of applicable changes.
43 Take a graph to base names on and an optional set of packages
44 to try and restrict to (restriction is not guaranteed)
45 """
46 changes = self._detect_changes(convert_packages, graph)
47 changes = self.arrange_for_graph(changes, graph, migration_name)
48 if trim_to_packages:
49 changes = self._trim_to_packages(changes, trim_to_packages)
50 return changes
51
52 def deep_deconstruct(self, obj):
53 """
54 Recursive deconstruction for a field and its arguments.
55 Used for full comparison for rename/alter; sometimes a single-level
56 deconstruction will not compare correctly.
57 """
58 if isinstance(obj, list):
59 return [self.deep_deconstruct(value) for value in obj]
60 elif isinstance(obj, tuple):
61 return tuple(self.deep_deconstruct(value) for value in obj)
62 elif isinstance(obj, dict):
63 return {key: self.deep_deconstruct(value) for key, value in obj.items()}
64 elif isinstance(obj, functools.partial):
65 return (
66 obj.func,
67 self.deep_deconstruct(obj.args),
68 self.deep_deconstruct(obj.keywords),
69 )
70 elif isinstance(obj, COMPILED_REGEX_TYPE):
71 return RegexObject(obj)
72 elif isinstance(obj, type):
73 # If this is a type that implements 'deconstruct' as an instance method,
74 # avoid treating this as being deconstructible itself - see #22951
75 return obj
76 elif hasattr(obj, "deconstruct"):
77 deconstructed = obj.deconstruct()
78 if isinstance(obj, models.Field):
79 # we have a field which also returns a name
80 deconstructed = deconstructed[1:]
81 path, args, kwargs = deconstructed
82 return (
83 path,
84 [self.deep_deconstruct(value) for value in args],
85 {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
86 )
87 else:
88 return obj
89
90 def only_relation_agnostic_fields(self, fields):
91 """
92 Return a definition of the fields that ignores field names and
93 what related fields actually relate to. Used for detecting renames (as
94 the related fields change during renames).
95 """
96 fields_def = []
97 for name, field in sorted(fields.items()):
98 deconstruction = self.deep_deconstruct(field)
99 if field.remote_field and field.remote_field.model:
100 deconstruction[2].pop("to", None)
101 fields_def.append(deconstruction)
102 return fields_def
103
104 def _detect_changes(self, convert_packages=None, graph=None):
105 """
106 Return a dict of migration plans which will achieve the
107 change from from_state to to_state. The dict has app labels
108 as keys and a list of migrations as values.
109
110 The resulting migrations aren't specially named, but the names
111 do matter for dependencies inside the set.
112
113 convert_packages is the list of packages to convert to use migrations
114 (i.e. to make initial migrations for, in the usual case)
115
116 graph is an optional argument that, if provided, can help improve
117 dependency generation and avoid potential circular dependencies.
118 """
119 # The first phase is generating all the operations for each app
120 # and gathering them into a big per-app list.
121 # Then go through that list, order it, and split into migrations to
122 # resolve dependencies caused by M2Ms and FKs.
123 self.generated_operations = {}
124 self.altered_indexes = {}
125 self.altered_constraints = {}
126 self.renamed_fields = {}
127
128 # Prepare some old/new state and model lists, ignoring unmigrated packages.
129 self.old_model_keys = set()
130 self.new_model_keys = set()
131 for (package_label, model_name), model_state in self.from_state.models.items():
132 if package_label not in self.from_state.real_packages:
133 self.old_model_keys.add((package_label, model_name))
134
135 for (package_label, model_name), model_state in self.to_state.models.items():
136 if package_label not in self.from_state.real_packages or (
137 convert_packages and package_label in convert_packages
138 ):
139 self.new_model_keys.add((package_label, model_name))
140
141 self.from_state.resolve_fields_and_relations()
142 self.to_state.resolve_fields_and_relations()
143
144 # Renames have to come first
145 self.generate_renamed_models()
146
147 # Prepare lists of fields and generate through model map
148 self._prepare_field_lists()
149 self._generate_through_model_map()
150
151 # Generate non-rename model operations
152 self.generate_deleted_models()
153 self.generate_created_models()
154 self.generate_altered_options()
155 self.generate_altered_db_table_comment()
156
157 # Create the renamed fields and store them in self.renamed_fields.
158 # They are used by create_altered_indexes(), generate_altered_fields(),
159 # generate_removed_altered_index(), and
160 # generate_altered_index().
161 self.create_renamed_fields()
162 # Create the altered indexes and store them in self.altered_indexes.
163 # This avoids the same computation in generate_removed_indexes()
164 # and generate_added_indexes().
165 self.create_altered_indexes()
166 self.create_altered_constraints()
167 # Generate index removal operations before field is removed
168 self.generate_removed_constraints()
169 self.generate_removed_indexes()
170 # Generate field renaming operations.
171 self.generate_renamed_fields()
172 self.generate_renamed_indexes()
173 # Generate field operations.
174 self.generate_removed_fields()
175 self.generate_added_fields()
176 self.generate_altered_fields()
177 self.generate_added_indexes()
178 self.generate_added_constraints()
179 self.generate_altered_db_table()
180
181 self._sort_migrations()
182 self._build_migration_list(graph)
183 self._optimize_migrations()
184
185 return self.migrations
186
187 def _prepare_field_lists(self):
188 """
189 Prepare field lists and a list of the fields that used through models
190 in the old state so dependencies can be made from the through model
191 deletion to the field that uses it.
192 """
193 self.kept_model_keys = self.old_model_keys & self.new_model_keys
194 self.through_users = {}
195 self.old_field_keys = {
196 (package_label, model_name, field_name)
197 for package_label, model_name in self.kept_model_keys
198 for field_name in self.from_state.models[
199 package_label,
200 self.renamed_models.get((package_label, model_name), model_name),
201 ].fields
202 }
203 self.new_field_keys = {
204 (package_label, model_name, field_name)
205 for package_label, model_name in self.kept_model_keys
206 for field_name in self.to_state.models[package_label, model_name].fields
207 }
208
209 def _generate_through_model_map(self):
210 """Through model map generation."""
211 for package_label, model_name in sorted(self.old_model_keys):
212 old_model_name = self.renamed_models.get(
213 (package_label, model_name), model_name
214 )
215 old_model_state = self.from_state.models[package_label, old_model_name]
216 for field_name, field in old_model_state.fields.items():
217 if hasattr(field, "remote_field") and getattr(
218 field.remote_field, "through", None
219 ):
220 through_key = resolve_relation(
221 field.remote_field.through, package_label, model_name
222 )
223 self.through_users[through_key] = (
224 package_label,
225 old_model_name,
226 field_name,
227 )
228
229 @staticmethod
230 def _resolve_dependency(dependency):
231 """
232 Return the resolved dependency and a boolean denoting whether or not
233 it was a settings dependency.
234 """
235 if not isinstance(dependency, SettingsTuple):
236 return dependency, False
237 resolved_package_label, resolved_object_name = getattr(
238 settings, dependency[1]
239 ).split(".")
240 return (resolved_package_label, resolved_object_name.lower()) + dependency[
241 2:
242 ], True
243
244 def _build_migration_list(self, graph=None):
245 """
246 Chop the lists of operations up into migrations with dependencies on
247 each other. Do this by going through an app's list of operations until
248 one is found that has an outgoing dependency that isn't in another
249 app's migration yet (hasn't been chopped off its list). Then chop off
250 the operations before it into a migration and move onto the next app.
251 If the loops completes without doing anything, there's a circular
252 dependency (which _should_ be impossible as the operations are
253 all split at this point so they can't depend and be depended on).
254 """
255 self.migrations = {}
256 num_ops = sum(len(x) for x in self.generated_operations.values())
257 chop_mode = False
258 while num_ops:
259 # On every iteration, we step through all the packages and see if there
260 # is a completed set of operations.
261 # If we find that a subset of the operations are complete we can
262 # try to chop it off from the rest and continue, but we only
263 # do this if we've already been through the list once before
264 # without any chopping and nothing has changed.
265 for package_label in sorted(self.generated_operations):
266 chopped = []
267 dependencies = set()
268 for operation in list(self.generated_operations[package_label]):
269 deps_satisfied = True
270 operation_dependencies = set()
271 for dep in operation._auto_deps:
272 # Temporarily resolve the settings dependency to
273 # prevent circular references. While keeping the
274 # dependency checks on the resolved model, add the
275 # settings dependencies.
276 original_dep = dep
277 dep, is_settings_dep = self._resolve_dependency(dep)
278 if dep[0] != package_label:
279 # External app dependency. See if it's not yet
280 # satisfied.
281 for other_operation in self.generated_operations.get(
282 dep[0], []
283 ):
284 if self.check_dependency(other_operation, dep):
285 deps_satisfied = False
286 break
287 if not deps_satisfied:
288 break
289 else:
290 if is_settings_dep:
291 operation_dependencies.add(
292 (original_dep[0], original_dep[1])
293 )
294 elif dep[0] in self.migrations:
295 operation_dependencies.add(
296 (dep[0], self.migrations[dep[0]][-1].name)
297 )
298 else:
299 # If we can't find the other app, we add a
300 # first/last dependency, but only if we've
301 # already been through once and checked
302 # everything.
303 if chop_mode:
304 # If the app already exists, we add a
305 # dependency on the last migration, as
306 # we don't know which migration
307 # contains the target field. If it's
308 # not yet migrated or has no
309 # migrations, we use __first__.
310 if graph and graph.leaf_nodes(dep[0]):
311 operation_dependencies.add(
312 graph.leaf_nodes(dep[0])[0]
313 )
314 else:
315 operation_dependencies.add(
316 (dep[0], "__first__")
317 )
318 else:
319 deps_satisfied = False
320 if deps_satisfied:
321 chopped.append(operation)
322 dependencies.update(operation_dependencies)
323 del self.generated_operations[package_label][0]
324 else:
325 break
326 # Make a migration! Well, only if there's stuff to put in it
327 if dependencies or chopped:
328 if not self.generated_operations[package_label] or chop_mode:
329 subclass = type(
330 "Migration",
331 (Migration,),
332 {"operations": [], "dependencies": []},
333 )
334 instance = subclass(
335 "auto_%i" # noqa: UP031
336 % (len(self.migrations.get(package_label, [])) + 1),
337 package_label,
338 )
339 instance.dependencies = list(dependencies)
340 instance.operations = chopped
341 instance.initial = package_label not in self.existing_packages
342 self.migrations.setdefault(package_label, []).append(instance)
343 chop_mode = False
344 else:
345 self.generated_operations[package_label] = (
346 chopped + self.generated_operations[package_label]
347 )
348 new_num_ops = sum(len(x) for x in self.generated_operations.values())
349 if new_num_ops == num_ops:
350 if not chop_mode:
351 chop_mode = True
352 else:
353 raise ValueError(
354 f"Cannot resolve operation dependencies: {self.generated_operations!r}"
355 )
356 num_ops = new_num_ops
357
358 def _sort_migrations(self):
359 """
360 Reorder to make things possible. Reordering may be needed so FKs work
361 nicely inside the same app.
362 """
363 for package_label, ops in sorted(self.generated_operations.items()):
364 ts = TopologicalSorter()
365 for op in ops:
366 ts.add(op)
367 for dep in op._auto_deps:
368 # Resolve intra-app dependencies to handle circular
369 # references involving a settings model.
370 dep = self._resolve_dependency(dep)[0]
371 if dep[0] != package_label:
372 continue
373 ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
374 self.generated_operations[package_label] = list(ts.static_order())
375
376 def _optimize_migrations(self):
377 # Add in internal dependencies among the migrations
378 for package_label, migrations in self.migrations.items():
379 for m1, m2 in zip(migrations, migrations[1:]):
380 m2.dependencies.append((package_label, m1.name))
381
382 # De-dupe dependencies
383 for migrations in self.migrations.values():
384 for migration in migrations:
385 migration.dependencies = list(set(migration.dependencies))
386
387 # Optimize migrations
388 for package_label, migrations in self.migrations.items():
389 for migration in migrations:
390 migration.operations = MigrationOptimizer().optimize(
391 migration.operations, package_label
392 )
393
394 def check_dependency(self, operation, dependency):
395 """
396 Return True if the given operation depends on the given dependency,
397 False otherwise.
398 """
399 # Created model
400 if dependency[2] is None and dependency[3] is True:
401 return (
402 isinstance(operation, operations.CreateModel)
403 and operation.name_lower == dependency[1].lower()
404 )
405 # Created field
406 elif dependency[2] is not None and dependency[3] is True:
407 return (
408 isinstance(operation, operations.CreateModel)
409 and operation.name_lower == dependency[1].lower()
410 and any(dependency[2] == x for x, y in operation.fields)
411 ) or (
412 isinstance(operation, operations.AddField)
413 and operation.model_name_lower == dependency[1].lower()
414 and operation.name_lower == dependency[2].lower()
415 )
416 # Removed field
417 elif dependency[2] is not None and dependency[3] is False:
418 return (
419 isinstance(operation, operations.RemoveField)
420 and operation.model_name_lower == dependency[1].lower()
421 and operation.name_lower == dependency[2].lower()
422 )
423 # Removed model
424 elif dependency[2] is None and dependency[3] is False:
425 return (
426 isinstance(operation, operations.DeleteModel)
427 and operation.name_lower == dependency[1].lower()
428 )
429 # Field being altered
430 elif dependency[2] is not None and dependency[3] == "alter":
431 return (
432 isinstance(operation, operations.AlterField)
433 and operation.model_name_lower == dependency[1].lower()
434 and operation.name_lower == dependency[2].lower()
435 )
436 # Unknown dependency. Raise an error.
437 else:
438 raise ValueError(f"Can't handle dependency {dependency!r}")
439
440 def add_operation(
441 self, package_label, operation, dependencies=None, beginning=False
442 ):
443 # Dependencies are
444 # (package_label, model_name, field_name, create/delete as True/False)
445 operation._auto_deps = dependencies or []
446 if beginning:
447 self.generated_operations.setdefault(package_label, []).insert(0, operation)
448 else:
449 self.generated_operations.setdefault(package_label, []).append(operation)
450
451 def generate_renamed_models(self):
452 """
453 Find any renamed models, generate the operations for them, and remove
454 the old entry from the model lists. Must be run before other
455 model-level generation.
456 """
457 self.renamed_models = {}
458 self.renamed_models_rel = {}
459 added_models = self.new_model_keys - self.old_model_keys
460 for package_label, model_name in sorted(added_models):
461 model_state = self.to_state.models[package_label, model_name]
462 model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
463
464 removed_models = self.old_model_keys - self.new_model_keys
465 for rem_package_label, rem_model_name in removed_models:
466 if rem_package_label == package_label:
467 rem_model_state = self.from_state.models[
468 rem_package_label, rem_model_name
469 ]
470 rem_model_fields_def = self.only_relation_agnostic_fields(
471 rem_model_state.fields
472 )
473 if model_fields_def == rem_model_fields_def:
474 if self.questioner.ask_rename_model(
475 rem_model_state, model_state
476 ):
477 dependencies = []
478 fields = list(model_state.fields.values()) + [
479 field.remote_field
480 for relations in self.to_state.relations[
481 package_label, model_name
482 ].values()
483 for field in relations.values()
484 ]
485 for field in fields:
486 if field.is_relation:
487 dependencies.extend(
488 self._get_dependencies_for_foreign_key(
489 package_label,
490 model_name,
491 field,
492 self.to_state,
493 )
494 )
495 self.add_operation(
496 package_label,
497 operations.RenameModel(
498 old_name=rem_model_state.name,
499 new_name=model_state.name,
500 ),
501 dependencies=dependencies,
502 )
503 self.renamed_models[package_label, model_name] = (
504 rem_model_name
505 )
506 renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
507 self.renamed_models_rel[renamed_models_rel_key] = (
508 f"{model_state.package_label}.{model_state.name_lower}"
509 )
510 self.old_model_keys.remove(
511 (rem_package_label, rem_model_name)
512 )
513 self.old_model_keys.add((package_label, model_name))
514 break
515
516 def generate_created_models(self):
517 """
518 Find all new models and make create
519 operations for them as well as separate operations to create any
520 foreign key or M2M relationships (these are optimized later, if
521 possible).
522
523 Defer any model options that refer to collections of fields that might
524 be deferred.
525 """
526 added_models = self.new_model_keys - self.old_model_keys
527
528 for package_label, model_name in added_models:
529 model_state = self.to_state.models[package_label, model_name]
530 # Gather related fields
531 related_fields = {}
532 primary_key_rel = None
533 for field_name, field in model_state.fields.items():
534 if field.remote_field:
535 if field.remote_field.model:
536 if field.primary_key:
537 primary_key_rel = field.remote_field.model
538 else:
539 related_fields[field_name] = field
540 if getattr(field.remote_field, "through", None):
541 related_fields[field_name] = field
542
543 # Are there indexes to defer?
544 indexes = model_state.options.pop("indexes")
545 constraints = model_state.options.pop("constraints")
546 # Depend on the deletion of any possible proxy version of us
547 dependencies = [
548 (package_label, model_name, None, False),
549 ]
550 # Depend on all bases
551 for base in model_state.bases:
552 if isinstance(base, str) and "." in base:
553 base_package_label, base_name = base.split(".", 1)
554 dependencies.append((base_package_label, base_name, None, True))
555 # Depend on the removal of base fields if the new model has
556 # a field with the same name.
557 old_base_model_state = self.from_state.models.get(
558 (base_package_label, base_name)
559 )
560 new_base_model_state = self.to_state.models.get(
561 (base_package_label, base_name)
562 )
563 if old_base_model_state and new_base_model_state:
564 removed_base_fields = (
565 set(old_base_model_state.fields)
566 .difference(
567 new_base_model_state.fields,
568 )
569 .intersection(model_state.fields)
570 )
571 for removed_base_field in removed_base_fields:
572 dependencies.append(
573 (
574 base_package_label,
575 base_name,
576 removed_base_field,
577 False,
578 )
579 )
580 # Depend on the other end of the primary key if it's a relation
581 if primary_key_rel:
582 dependencies.append(
583 resolve_relation(
584 primary_key_rel,
585 package_label,
586 model_name,
587 )
588 + (None, True)
589 )
590 # Generate creation operation
591 self.add_operation(
592 package_label,
593 operations.CreateModel(
594 name=model_state.name,
595 fields=[
596 d
597 for d in model_state.fields.items()
598 if d[0] not in related_fields
599 ],
600 options=model_state.options,
601 bases=model_state.bases,
602 ),
603 dependencies=dependencies,
604 beginning=True,
605 )
606
607 # Generate operations for each related field
608 for name, field in sorted(related_fields.items()):
609 dependencies = self._get_dependencies_for_foreign_key(
610 package_label,
611 model_name,
612 field,
613 self.to_state,
614 )
615 # Depend on our own model being created
616 dependencies.append((package_label, model_name, None, True))
617 # Make operation
618 self.add_operation(
619 package_label,
620 operations.AddField(
621 model_name=model_name,
622 name=name,
623 field=field,
624 ),
625 dependencies=list(set(dependencies)),
626 )
627
628 related_dependencies = [
629 (package_label, model_name, name, True)
630 for name in sorted(related_fields)
631 ]
632 related_dependencies.append((package_label, model_name, None, True))
633 for index in indexes:
634 self.add_operation(
635 package_label,
636 operations.AddIndex(
637 model_name=model_name,
638 index=index,
639 ),
640 dependencies=related_dependencies,
641 )
642 for constraint in constraints:
643 self.add_operation(
644 package_label,
645 operations.AddConstraint(
646 model_name=model_name,
647 constraint=constraint,
648 ),
649 dependencies=related_dependencies,
650 )
651
652 def generate_deleted_models(self):
653 """
654 Find all deleted models and make delete
655 operations for them as well as separate operations to delete any
656 foreign key or M2M relationships (these are optimized later, if
657 possible).
658
659 Also bring forward removal of any model options that refer to
660 collections of fields - the inverse of generate_created_models().
661 """
662 deleted_models = self.old_model_keys - self.new_model_keys
663
664 for package_label, model_name in sorted(deleted_models):
665 model_state = self.from_state.models[package_label, model_name]
666 # Gather related fields
667 related_fields = {}
668 for field_name, field in model_state.fields.items():
669 if field.remote_field:
670 if field.remote_field.model:
671 related_fields[field_name] = field
672 if getattr(field.remote_field, "through", None):
673 related_fields[field_name] = field
674
675 # Then remove each related field
676 for name in sorted(related_fields):
677 self.add_operation(
678 package_label,
679 operations.RemoveField(
680 model_name=model_name,
681 name=name,
682 ),
683 )
684 # Finally, remove the model.
685 # This depends on both the removal/alteration of all incoming fields
686 # and the removal of all its own related fields, and if it's
687 # a through model the field that references it.
688 dependencies = []
689 relations = self.from_state.relations
690 for (
691 related_object_package_label,
692 object_name,
693 ), relation_related_fields in relations[package_label, model_name].items():
694 for field_name, field in relation_related_fields.items():
695 dependencies.append(
696 (related_object_package_label, object_name, field_name, False),
697 )
698 if not field.many_to_many:
699 dependencies.append(
700 (
701 related_object_package_label,
702 object_name,
703 field_name,
704 "alter",
705 ),
706 )
707
708 for name in sorted(related_fields):
709 dependencies.append((package_label, model_name, name, False))
710 # We're referenced in another field's through=
711 through_user = self.through_users.get(
712 (package_label, model_state.name_lower)
713 )
714 if through_user:
715 dependencies.append(
716 (through_user[0], through_user[1], through_user[2], False)
717 )
718 # Finally, make the operation, deduping any dependencies
719 self.add_operation(
720 package_label,
721 operations.DeleteModel(
722 name=model_state.name,
723 ),
724 dependencies=list(set(dependencies)),
725 )
726
727 def create_renamed_fields(self):
728 """Work out renamed fields."""
729 self.renamed_operations = []
730 old_field_keys = self.old_field_keys.copy()
731 for package_label, model_name, field_name in sorted(
732 self.new_field_keys - old_field_keys
733 ):
734 old_model_name = self.renamed_models.get(
735 (package_label, model_name), model_name
736 )
737 old_model_state = self.from_state.models[package_label, old_model_name]
738 new_model_state = self.to_state.models[package_label, model_name]
739 field = new_model_state.get_field(field_name)
740 # Scan to see if this is actually a rename!
741 field_dec = self.deep_deconstruct(field)
742 for rem_package_label, rem_model_name, rem_field_name in sorted(
743 old_field_keys - self.new_field_keys
744 ):
745 if rem_package_label == package_label and rem_model_name == model_name:
746 old_field = old_model_state.get_field(rem_field_name)
747 old_field_dec = self.deep_deconstruct(old_field)
748 if (
749 field.remote_field
750 and field.remote_field.model
751 and "to" in old_field_dec[2]
752 ):
753 old_rel_to = old_field_dec[2]["to"]
754 if old_rel_to in self.renamed_models_rel:
755 old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
756 old_field.set_attributes_from_name(rem_field_name)
757 old_db_column = old_field.get_attname_column()[1]
758 if old_field_dec == field_dec or (
759 # Was the field renamed and db_column equal to the
760 # old field's column added?
761 old_field_dec[0:2] == field_dec[0:2]
762 and dict(old_field_dec[2], db_column=old_db_column)
763 == field_dec[2]
764 ):
765 if self.questioner.ask_rename(
766 model_name, rem_field_name, field_name, field
767 ):
768 self.renamed_operations.append(
769 (
770 rem_package_label,
771 rem_model_name,
772 old_field.db_column,
773 rem_field_name,
774 package_label,
775 model_name,
776 field,
777 field_name,
778 )
779 )
780 old_field_keys.remove(
781 (rem_package_label, rem_model_name, rem_field_name)
782 )
783 old_field_keys.add((package_label, model_name, field_name))
784 self.renamed_fields[
785 package_label, model_name, field_name
786 ] = rem_field_name
787 break
788
789 def generate_renamed_fields(self):
790 """Generate RenameField operations."""
791 for (
792 rem_package_label,
793 rem_model_name,
794 rem_db_column,
795 rem_field_name,
796 package_label,
797 model_name,
798 field,
799 field_name,
800 ) in self.renamed_operations:
801 # A db_column mismatch requires a prior noop AlterField for the
802 # subsequent RenameField to be a noop on attempts at preserving the
803 # old name.
804 if rem_db_column != field.db_column:
805 altered_field = field.clone()
806 altered_field.name = rem_field_name
807 self.add_operation(
808 package_label,
809 operations.AlterField(
810 model_name=model_name,
811 name=rem_field_name,
812 field=altered_field,
813 ),
814 )
815 self.add_operation(
816 package_label,
817 operations.RenameField(
818 model_name=model_name,
819 old_name=rem_field_name,
820 new_name=field_name,
821 ),
822 )
823 self.old_field_keys.remove(
824 (rem_package_label, rem_model_name, rem_field_name)
825 )
826 self.old_field_keys.add((package_label, model_name, field_name))
827
828 def generate_added_fields(self):
829 """Make AddField operations."""
830 for package_label, model_name, field_name in sorted(
831 self.new_field_keys - self.old_field_keys
832 ):
833 self._generate_added_field(package_label, model_name, field_name)
834
835 def _generate_added_field(self, package_label, model_name, field_name):
836 field = self.to_state.models[package_label, model_name].get_field(field_name)
837 # Adding a field always depends at least on its removal.
838 dependencies = [(package_label, model_name, field_name, False)]
839 # Fields that are foreignkeys/m2ms depend on stuff.
840 if field.remote_field and field.remote_field.model:
841 dependencies.extend(
842 self._get_dependencies_for_foreign_key(
843 package_label,
844 model_name,
845 field,
846 self.to_state,
847 )
848 )
849 # You can't just add NOT NULL fields with no default or fields
850 # which don't allow empty strings as default.
851 time_fields = (models.DateField, models.DateTimeField, models.TimeField)
852 preserve_default = (
853 field.allow_null
854 or field.has_default()
855 or field.many_to_many
856 or (not field.required and field.empty_strings_allowed)
857 or (isinstance(field, time_fields) and field.auto_now)
858 )
859 if not preserve_default:
860 field = field.clone()
861 if isinstance(field, time_fields) and field.auto_now_add:
862 field.default = self.questioner.ask_auto_now_add_addition(
863 field_name, model_name
864 )
865 else:
866 field.default = self.questioner.ask_not_null_addition(
867 field_name, model_name
868 )
869 if (
870 field.primary_key
871 and field.default is not models.NOT_PROVIDED
872 and callable(field.default)
873 ):
874 self.questioner.ask_unique_callable_default_addition(field_name, model_name)
875 self.add_operation(
876 package_label,
877 operations.AddField(
878 model_name=model_name,
879 name=field_name,
880 field=field,
881 preserve_default=preserve_default,
882 ),
883 dependencies=dependencies,
884 )
885
886 def generate_removed_fields(self):
887 """Make RemoveField operations."""
888 for package_label, model_name, field_name in sorted(
889 self.old_field_keys - self.new_field_keys
890 ):
891 self._generate_removed_field(package_label, model_name, field_name)
892
893 def _generate_removed_field(self, package_label, model_name, field_name):
894 self.add_operation(
895 package_label,
896 operations.RemoveField(
897 model_name=model_name,
898 name=field_name,
899 ),
900 )
901
902 def generate_altered_fields(self):
903 """
904 Make AlterField operations, or possibly RemovedField/AddField if alter
905 isn't possible.
906 """
907 for package_label, model_name, field_name in sorted(
908 self.old_field_keys & self.new_field_keys
909 ):
910 # Did the field change?
911 old_model_name = self.renamed_models.get(
912 (package_label, model_name), model_name
913 )
914 old_field_name = self.renamed_fields.get(
915 (package_label, model_name, field_name), field_name
916 )
917 old_field = self.from_state.models[package_label, old_model_name].get_field(
918 old_field_name
919 )
920 new_field = self.to_state.models[package_label, model_name].get_field(
921 field_name
922 )
923 dependencies = []
924 # Implement any model renames on relations; these are handled by RenameModel
925 # so we need to exclude them from the comparison
926 if hasattr(new_field, "remote_field") and getattr(
927 new_field.remote_field, "model", None
928 ):
929 rename_key = resolve_relation(
930 new_field.remote_field.model, package_label, model_name
931 )
932 if rename_key in self.renamed_models:
933 new_field.remote_field.model = old_field.remote_field.model
934 # Handle ForeignKey which can only have a single to_field.
935 remote_field_name = getattr(new_field.remote_field, "field_name", None)
936 if remote_field_name:
937 to_field_rename_key = rename_key + (remote_field_name,)
938 if to_field_rename_key in self.renamed_fields:
939 # Repoint model name only
940 new_field.remote_field.model = old_field.remote_field.model
941 dependencies.extend(
942 self._get_dependencies_for_foreign_key(
943 package_label,
944 model_name,
945 new_field,
946 self.to_state,
947 )
948 )
949 if hasattr(new_field, "remote_field") and getattr(
950 new_field.remote_field, "through", None
951 ):
952 rename_key = resolve_relation(
953 new_field.remote_field.through, package_label, model_name
954 )
955 if rename_key in self.renamed_models:
956 new_field.remote_field.through = old_field.remote_field.through
957 old_field_dec = self.deep_deconstruct(old_field)
958 new_field_dec = self.deep_deconstruct(new_field)
959 # If the field was confirmed to be renamed it means that only
960 # db_column was allowed to change which generate_renamed_fields()
961 # already accounts for by adding an AlterField operation.
962 if old_field_dec != new_field_dec and old_field_name == field_name:
963 both_m2m = old_field.many_to_many and new_field.many_to_many
964 neither_m2m = not old_field.many_to_many and not new_field.many_to_many
965 if both_m2m or neither_m2m:
966 # Either both fields are m2m or neither is
967 preserve_default = True
968 if (
969 old_field.allow_null
970 and not new_field.allow_null
971 and not new_field.has_default()
972 and not new_field.many_to_many
973 ):
974 field = new_field.clone()
975 new_default = self.questioner.ask_not_null_alteration(
976 field_name, model_name
977 )
978 if new_default is not models.NOT_PROVIDED:
979 field.default = new_default
980 preserve_default = False
981 else:
982 field = new_field
983 self.add_operation(
984 package_label,
985 operations.AlterField(
986 model_name=model_name,
987 name=field_name,
988 field=field,
989 preserve_default=preserve_default,
990 ),
991 dependencies=dependencies,
992 )
993 else:
994 # We cannot alter between m2m and concrete fields
995 self._generate_removed_field(package_label, model_name, field_name)
996 self._generate_added_field(package_label, model_name, field_name)
997
998 def create_altered_indexes(self):
999 option_name = operations.AddIndex.option_name
1000
1001 for package_label, model_name in sorted(self.kept_model_keys):
1002 old_model_name = self.renamed_models.get(
1003 (package_label, model_name), model_name
1004 )
1005 old_model_state = self.from_state.models[package_label, old_model_name]
1006 new_model_state = self.to_state.models[package_label, model_name]
1007
1008 old_indexes = old_model_state.options[option_name]
1009 new_indexes = new_model_state.options[option_name]
1010 added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1011 removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1012 renamed_indexes = []
1013 # Find renamed indexes.
1014 remove_from_added = []
1015 remove_from_removed = []
1016 for new_index in added_indexes:
1017 new_index_dec = new_index.deconstruct()
1018 new_index_name = new_index_dec[2].pop("name")
1019 for old_index in removed_indexes:
1020 old_index_dec = old_index.deconstruct()
1021 old_index_name = old_index_dec[2].pop("name")
1022 # Indexes are the same except for the names.
1023 if (
1024 new_index_dec == old_index_dec
1025 and new_index_name != old_index_name
1026 ):
1027 renamed_indexes.append((old_index_name, new_index_name, None))
1028 remove_from_added.append(new_index)
1029 remove_from_removed.append(old_index)
1030
1031 # Remove renamed indexes from the lists of added and removed
1032 # indexes.
1033 added_indexes = [
1034 idx for idx in added_indexes if idx not in remove_from_added
1035 ]
1036 removed_indexes = [
1037 idx for idx in removed_indexes if idx not in remove_from_removed
1038 ]
1039
1040 self.altered_indexes.update(
1041 {
1042 (package_label, model_name): {
1043 "added_indexes": added_indexes,
1044 "removed_indexes": removed_indexes,
1045 "renamed_indexes": renamed_indexes,
1046 }
1047 }
1048 )
1049
1050 def generate_added_indexes(self):
1051 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1052 dependencies = self._get_dependencies_for_model(package_label, model_name)
1053 for index in alt_indexes["added_indexes"]:
1054 self.add_operation(
1055 package_label,
1056 operations.AddIndex(
1057 model_name=model_name,
1058 index=index,
1059 ),
1060 dependencies=dependencies,
1061 )
1062
1063 def generate_removed_indexes(self):
1064 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1065 for index in alt_indexes["removed_indexes"]:
1066 self.add_operation(
1067 package_label,
1068 operations.RemoveIndex(
1069 model_name=model_name,
1070 name=index.name,
1071 ),
1072 )
1073
1074 def generate_renamed_indexes(self):
1075 for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1076 for old_index_name, new_index_name, old_fields in alt_indexes[
1077 "renamed_indexes"
1078 ]:
1079 self.add_operation(
1080 package_label,
1081 operations.RenameIndex(
1082 model_name=model_name,
1083 new_name=new_index_name,
1084 old_name=old_index_name,
1085 old_fields=old_fields,
1086 ),
1087 )
1088
1089 def create_altered_constraints(self):
1090 option_name = operations.AddConstraint.option_name
1091 for package_label, model_name in sorted(self.kept_model_keys):
1092 old_model_name = self.renamed_models.get(
1093 (package_label, model_name), model_name
1094 )
1095 old_model_state = self.from_state.models[package_label, old_model_name]
1096 new_model_state = self.to_state.models[package_label, model_name]
1097
1098 old_constraints = old_model_state.options[option_name]
1099 new_constraints = new_model_state.options[option_name]
1100 add_constraints = [c for c in new_constraints if c not in old_constraints]
1101 rem_constraints = [c for c in old_constraints if c not in new_constraints]
1102
1103 self.altered_constraints.update(
1104 {
1105 (package_label, model_name): {
1106 "added_constraints": add_constraints,
1107 "removed_constraints": rem_constraints,
1108 }
1109 }
1110 )
1111
1112 def generate_added_constraints(self):
1113 for (
1114 package_label,
1115 model_name,
1116 ), alt_constraints in self.altered_constraints.items():
1117 dependencies = self._get_dependencies_for_model(package_label, model_name)
1118 for constraint in alt_constraints["added_constraints"]:
1119 self.add_operation(
1120 package_label,
1121 operations.AddConstraint(
1122 model_name=model_name,
1123 constraint=constraint,
1124 ),
1125 dependencies=dependencies,
1126 )
1127
1128 def generate_removed_constraints(self):
1129 for (
1130 package_label,
1131 model_name,
1132 ), alt_constraints in self.altered_constraints.items():
1133 for constraint in alt_constraints["removed_constraints"]:
1134 self.add_operation(
1135 package_label,
1136 operations.RemoveConstraint(
1137 model_name=model_name,
1138 name=constraint.name,
1139 ),
1140 )
1141
1142 @staticmethod
1143 def _get_dependencies_for_foreign_key(
1144 package_label, model_name, field, project_state
1145 ):
1146 remote_field_model = None
1147 if hasattr(field.remote_field, "model"):
1148 remote_field_model = field.remote_field.model
1149 else:
1150 relations = project_state.relations[package_label, model_name]
1151 for (remote_package_label, remote_model_name), fields in relations.items():
1152 if any(
1153 field == related_field.remote_field
1154 for related_field in fields.values()
1155 ):
1156 remote_field_model = f"{remote_package_label}.{remote_model_name}"
1157 break
1158 dep_package_label, dep_object_name = resolve_relation(
1159 remote_field_model,
1160 package_label,
1161 model_name,
1162 )
1163 dependencies = [(dep_package_label, dep_object_name, None, True)]
1164 if getattr(field.remote_field, "through", None):
1165 through_package_label, through_object_name = resolve_relation(
1166 field.remote_field.through,
1167 package_label,
1168 model_name,
1169 )
1170 dependencies.append(
1171 (through_package_label, through_object_name, None, True)
1172 )
1173 return dependencies
1174
1175 def _get_dependencies_for_model(self, package_label, model_name):
1176 """Return foreign key dependencies of the given model."""
1177 dependencies = []
1178 model_state = self.to_state.models[package_label, model_name]
1179 for field in model_state.fields.values():
1180 if field.is_relation:
1181 dependencies.extend(
1182 self._get_dependencies_for_foreign_key(
1183 package_label,
1184 model_name,
1185 field,
1186 self.to_state,
1187 )
1188 )
1189 return dependencies
1190
1191 def generate_altered_db_table(self):
1192 for package_label, model_name in sorted(self.kept_model_keys):
1193 old_model_name = self.renamed_models.get(
1194 (package_label, model_name), model_name
1195 )
1196 old_model_state = self.from_state.models[package_label, old_model_name]
1197 new_model_state = self.to_state.models[package_label, model_name]
1198 old_db_table_name = old_model_state.options.get("db_table")
1199 new_db_table_name = new_model_state.options.get("db_table")
1200 if old_db_table_name != new_db_table_name:
1201 self.add_operation(
1202 package_label,
1203 operations.AlterModelTable(
1204 name=model_name,
1205 table=new_db_table_name,
1206 ),
1207 )
1208
1209 def generate_altered_db_table_comment(self):
1210 for package_label, model_name in sorted(self.kept_model_keys):
1211 old_model_name = self.renamed_models.get(
1212 (package_label, model_name), model_name
1213 )
1214 old_model_state = self.from_state.models[package_label, old_model_name]
1215 new_model_state = self.to_state.models[package_label, model_name]
1216
1217 old_db_table_comment = old_model_state.options.get("db_table_comment")
1218 new_db_table_comment = new_model_state.options.get("db_table_comment")
1219 if old_db_table_comment != new_db_table_comment:
1220 self.add_operation(
1221 package_label,
1222 operations.AlterModelTableComment(
1223 name=model_name,
1224 table_comment=new_db_table_comment,
1225 ),
1226 )
1227
1228 def generate_altered_options(self):
1229 """
1230 Work out if any non-schema-affecting options have changed and make an
1231 operation to represent them in state changes (in case Python code in
1232 migrations needs them).
1233 """
1234 for package_label, model_name in sorted(self.kept_model_keys):
1235 old_model_name = self.renamed_models.get(
1236 (package_label, model_name), model_name
1237 )
1238 old_model_state = self.from_state.models[package_label, old_model_name]
1239 new_model_state = self.to_state.models[package_label, model_name]
1240 old_options = {
1241 key: value
1242 for key, value in old_model_state.options.items()
1243 if key in AlterModelOptions.ALTER_OPTION_KEYS
1244 }
1245 new_options = {
1246 key: value
1247 for key, value in new_model_state.options.items()
1248 if key in AlterModelOptions.ALTER_OPTION_KEYS
1249 }
1250 if old_options != new_options:
1251 self.add_operation(
1252 package_label,
1253 operations.AlterModelOptions(
1254 name=model_name,
1255 options=new_options,
1256 ),
1257 )
1258
1259 def arrange_for_graph(self, changes, graph, migration_name=None):
1260 """
1261 Take a result from changes() and a MigrationGraph, and fix the names
1262 and dependencies of the changes so they extend the graph from the leaf
1263 nodes for each app.
1264 """
1265 leaves = graph.leaf_nodes()
1266 name_map = {}
1267 for package_label, migrations in list(changes.items()):
1268 if not migrations:
1269 continue
1270 # Find the app label's current leaf node
1271 app_leaf = None
1272 for leaf in leaves:
1273 if leaf[0] == package_label:
1274 app_leaf = leaf
1275 break
1276 # Do they want an initial migration for this app?
1277 if app_leaf is None and not self.questioner.ask_initial(package_label):
1278 # They don't.
1279 for migration in migrations:
1280 name_map[(package_label, migration.name)] = (
1281 package_label,
1282 "__first__",
1283 )
1284 del changes[package_label]
1285 continue
1286 # Work out the next number in the sequence
1287 if app_leaf is None:
1288 next_number = 1
1289 else:
1290 next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1291 # Name each migration
1292 for i, migration in enumerate(migrations):
1293 if i == 0 and app_leaf:
1294 migration.dependencies.append(app_leaf)
1295 new_name_parts = ["%04i" % next_number] # noqa: UP031
1296 if migration_name:
1297 new_name_parts.append(migration_name)
1298 elif i == 0 and not app_leaf:
1299 new_name_parts.append("initial")
1300 else:
1301 new_name_parts.append(migration.suggest_name()[:100])
1302 new_name = "_".join(new_name_parts)
1303 name_map[(package_label, migration.name)] = (package_label, new_name)
1304 next_number += 1
1305 migration.name = new_name
1306 # Now fix dependencies
1307 for migrations in changes.values():
1308 for migration in migrations:
1309 migration.dependencies = [
1310 name_map.get(d, d) for d in migration.dependencies
1311 ]
1312 return changes
1313
1314 def _trim_to_packages(self, changes, package_labels):
1315 """
1316 Take changes from arrange_for_graph() and set of app labels, and return
1317 a modified set of changes which trims out as many migrations that are
1318 not in package_labels as possible. Note that some other migrations may
1319 still be present as they may be required dependencies.
1320 """
1321 # Gather other app dependencies in a first pass
1322 app_dependencies = {}
1323 for package_label, migrations in changes.items():
1324 for migration in migrations:
1325 for dep_package_label, name in migration.dependencies:
1326 app_dependencies.setdefault(package_label, set()).add(
1327 dep_package_label
1328 )
1329 required_packages = set(package_labels)
1330 # Keep resolving till there's no change
1331 old_required_packages = None
1332 while old_required_packages != required_packages:
1333 old_required_packages = set(required_packages)
1334 required_packages.update(
1335 *[
1336 app_dependencies.get(package_label, ())
1337 for package_label in required_packages
1338 ]
1339 )
1340 # Remove all migrations that aren't needed
1341 for package_label in list(changes):
1342 if package_label not in required_packages:
1343 del changes[package_label]
1344 return changes
1345
1346 @classmethod
1347 def parse_number(cls, name):
1348 """
1349 Given a migration name, try to extract a number from the beginning of
1350 it. For a squashed migration such as '0001_squashed_0004…', return the
1351 second number. If no number is found, return None.
1352 """
1353 if squashed_match := re.search(r".*_squashed_(\d+)", name):
1354 return int(squashed_match[1])
1355 match = re.match(r"^\d+", name)
1356 if match:
1357 return int(match[0])
1358 return None