Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import functools
   4import re
   5from graphlib import TopologicalSorter
   6from typing import TYPE_CHECKING, Any
   7
   8from plain.models.fields import (
   9    NOT_PROVIDED,
  10    DateField,
  11    DateTimeField,
  12    Field,
  13    TimeField,
  14)
  15from plain.models.migrations import operations
  16from plain.models.migrations.migration import Migration, SettingsTuple
  17from plain.models.migrations.operations.models import AlterModelOptions
  18from plain.models.migrations.optimizer import MigrationOptimizer
  19from plain.models.migrations.questioner import MigrationQuestioner
  20from plain.models.migrations.utils import (
  21    COMPILED_REGEX_TYPE,
  22    RegexObject,
  23    resolve_relation,
  24)
  25from plain.runtime import settings
  26
  27if TYPE_CHECKING:
  28    from plain.models.migrations.graph import MigrationGraph
  29    from plain.models.migrations.operations.base import Operation
  30    from plain.models.migrations.state import ProjectState
  31
  32
  33class MigrationAutodetector:
  34    """
  35    Take a pair of ProjectStates and compare them to see what the first would
  36    need doing to make it match the second (the second usually being the
  37    project's current state).
  38
  39    Note that this naturally operates on entire projects at a time,
  40    as it's likely that changes interact (for example, you can't
  41    add a ForeignKey without having a migration to add the table it
  42    depends on first). A user interface may offer single-app usage
  43    if it wishes, with the caveat that it may not always be possible.
  44    """
  45
  46    def __init__(
  47        self,
  48        from_state: ProjectState,
  49        to_state: ProjectState,
  50        questioner: MigrationQuestioner | None = None,
  51    ):
  52        self.from_state = from_state
  53        self.to_state = to_state
  54        self.questioner = questioner or MigrationQuestioner()
  55        self.existing_packages = {app for app, model in from_state.models}
  56
  57    def changes(
  58        self,
  59        graph: MigrationGraph,
  60        trim_to_packages: set[str] | None = None,
  61        convert_packages: set[str] | None = None,
  62        migration_name: str | None = None,
  63    ) -> dict[str, list[Migration]]:
  64        """
  65        Main entry point to produce a list of applicable changes.
  66        Take a graph to base names on and an optional set of packages
  67        to try and restrict to (restriction is not guaranteed)
  68        """
  69        changes = self._detect_changes(convert_packages, graph)
  70        changes = self.arrange_for_graph(changes, graph, migration_name)
  71        if trim_to_packages:
  72            changes = self._trim_to_packages(changes, trim_to_packages)
  73        return changes
  74
  75    def deep_deconstruct(self, obj: Any) -> Any:
  76        """
  77        Recursive deconstruction for a field and its arguments.
  78        Used for full comparison for rename/alter; sometimes a single-level
  79        deconstruction will not compare correctly.
  80        """
  81        if isinstance(obj, list):
  82            return [self.deep_deconstruct(value) for value in obj]
  83        elif isinstance(obj, tuple):
  84            return tuple(self.deep_deconstruct(value) for value in obj)
  85        elif isinstance(obj, dict):
  86            return {key: self.deep_deconstruct(value) for key, value in obj.items()}
  87        elif isinstance(obj, functools.partial):
  88            return (
  89                obj.func,
  90                self.deep_deconstruct(obj.args),
  91                self.deep_deconstruct(obj.keywords),
  92            )
  93        elif isinstance(obj, COMPILED_REGEX_TYPE):
  94            return RegexObject(obj)
  95        elif isinstance(obj, type):
  96            # If this is a type that implements 'deconstruct' as an instance method,
  97            # avoid treating this as being deconstructible itself - see #22951
  98            return obj
  99        elif hasattr(obj, "deconstruct"):
 100            deconstructed = obj.deconstruct()
 101            if isinstance(obj, Field):
 102                # we have a field which also returns a name
 103                deconstructed = deconstructed[1:]
 104            path, args, kwargs = deconstructed
 105            return (
 106                path,
 107                [self.deep_deconstruct(value) for value in args],
 108                {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
 109            )
 110        else:
 111            return obj
 112
 113    def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
 114        """
 115        Return a definition of the fields that ignores field names and
 116        what related fields actually relate to. Used for detecting renames (as
 117        the related fields change during renames).
 118        """
 119        fields_def = []
 120        for name, field in sorted(fields.items()):
 121            deconstruction = self.deep_deconstruct(field)
 122            if field.remote_field and field.remote_field.model:
 123                deconstruction[2].pop("to", None)
 124            fields_def.append(deconstruction)
 125        return fields_def
 126
 127    def _detect_changes(
 128        self,
 129        convert_packages: set[str] | None = None,
 130        graph: MigrationGraph | None = None,
 131    ) -> dict[str, list[Migration]]:
 132        """
 133        Return a dict of migration plans which will achieve the
 134        change from from_state to to_state. The dict has app labels
 135        as keys and a list of migrations as values.
 136
 137        The resulting migrations aren't specially named, but the names
 138        do matter for dependencies inside the set.
 139
 140        convert_packages is the list of packages to convert to use migrations
 141        (i.e. to make initial migrations for, in the usual case)
 142
 143        graph is an optional argument that, if provided, can help improve
 144        dependency generation and avoid potential circular dependencies.
 145        """
 146        # The first phase is generating all the operations for each app
 147        # and gathering them into a big per-app list.
 148        # Then go through that list, order it, and split into migrations to
 149        # resolve dependencies caused by M2Ms and FKs.
 150        self.generated_operations = {}
 151        self.altered_indexes = {}
 152        self.altered_constraints = {}
 153        self.renamed_fields = {}
 154
 155        # Prepare some old/new state and model lists, ignoring unmigrated packages.
 156        self.old_model_keys = set()
 157        self.new_model_keys = set()
 158        for (package_label, model_name), model_state in self.from_state.models.items():
 159            if package_label not in self.from_state.real_packages:
 160                self.old_model_keys.add((package_label, model_name))
 161
 162        for (package_label, model_name), model_state in self.to_state.models.items():
 163            if package_label not in self.from_state.real_packages or (
 164                convert_packages and package_label in convert_packages
 165            ):
 166                self.new_model_keys.add((package_label, model_name))
 167
 168        self.from_state.resolve_fields_and_relations()
 169        self.to_state.resolve_fields_and_relations()
 170
 171        # Renames have to come first
 172        self.generate_renamed_models()
 173
 174        # Prepare lists of fields and generate through model map
 175        self._prepare_field_lists()
 176        self._generate_through_model_map()
 177
 178        # Generate non-rename model operations
 179        self.generate_deleted_models()
 180        self.generate_created_models()
 181        self.generate_altered_options()
 182        self.generate_altered_db_table_comment()
 183
 184        # Create the renamed fields and store them in self.renamed_fields.
 185        # They are used by create_altered_indexes(), generate_altered_fields(),
 186        # generate_removed_altered_index(), and
 187        # generate_altered_index().
 188        self.create_renamed_fields()
 189        # Create the altered indexes and store them in self.altered_indexes.
 190        # This avoids the same computation in generate_removed_indexes()
 191        # and generate_added_indexes().
 192        self.create_altered_indexes()
 193        self.create_altered_constraints()
 194        # Generate index removal operations before field is removed
 195        self.generate_removed_constraints()
 196        self.generate_removed_indexes()
 197        # Generate field renaming operations.
 198        self.generate_renamed_fields()
 199        self.generate_renamed_indexes()
 200        # Generate field operations.
 201        self.generate_removed_fields()
 202        self.generate_added_fields()
 203        self.generate_altered_fields()
 204        self.generate_added_indexes()
 205        self.generate_added_constraints()
 206        self.generate_altered_db_table()
 207
 208        self._sort_migrations()
 209        self._build_migration_list(graph)
 210        self._optimize_migrations()
 211
 212        return self.migrations
 213
 214    def _prepare_field_lists(self) -> None:
 215        """
 216        Prepare field lists and a list of the fields that used through models
 217        in the old state so dependencies can be made from the through model
 218        deletion to the field that uses it.
 219        """
 220        self.kept_model_keys = self.old_model_keys & self.new_model_keys
 221        self.through_users = {}
 222        self.old_field_keys = {
 223            (package_label, model_name, field_name)
 224            for package_label, model_name in self.kept_model_keys
 225            for field_name in self.from_state.models[
 226                package_label,
 227                self.renamed_models.get((package_label, model_name), model_name),
 228            ].fields
 229        }
 230        self.new_field_keys = {
 231            (package_label, model_name, field_name)
 232            for package_label, model_name in self.kept_model_keys
 233            for field_name in self.to_state.models[package_label, model_name].fields
 234        }
 235
 236    def _generate_through_model_map(self) -> None:
 237        """Through model map generation."""
 238        for package_label, model_name in sorted(self.old_model_keys):
 239            old_model_name = self.renamed_models.get(
 240                (package_label, model_name), model_name
 241            )
 242            old_model_state = self.from_state.models[package_label, old_model_name]
 243            for field_name, field in old_model_state.fields.items():
 244                if hasattr(field, "remote_field") and getattr(
 245                    field.remote_field, "through", None
 246                ):
 247                    through_key = resolve_relation(
 248                        field.remote_field.through, package_label, model_name
 249                    )
 250                    self.through_users[through_key] = (
 251                        package_label,
 252                        old_model_name,
 253                        field_name,
 254                    )
 255
 256    @staticmethod
 257    def _resolve_dependency(
 258        dependency: tuple[str, str, str | None, bool | str],
 259    ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
 260        """
 261        Return the resolved dependency and a boolean denoting whether or not
 262        it was a settings dependency.
 263        """
 264        if not isinstance(dependency, SettingsTuple):
 265            return dependency, False
 266        resolved_package_label, resolved_object_name = getattr(
 267            settings, dependency[1]
 268        ).split(".")
 269        return (resolved_package_label, resolved_object_name.lower()) + dependency[
 270            2:
 271        ], True
 272
 273    def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
 274        """
 275        Chop the lists of operations up into migrations with dependencies on
 276        each other. Do this by going through an app's list of operations until
 277        one is found that has an outgoing dependency that isn't in another
 278        app's migration yet (hasn't been chopped off its list). Then chop off
 279        the operations before it into a migration and move onto the next app.
 280        If the loops completes without doing anything, there's a circular
 281        dependency (which _should_ be impossible as the operations are
 282        all split at this point so they can't depend and be depended on).
 283        """
 284        self.migrations = {}
 285        num_ops = sum(len(x) for x in self.generated_operations.values())
 286        chop_mode = False
 287        while num_ops:
 288            # On every iteration, we step through all the packages and see if there
 289            # is a completed set of operations.
 290            # If we find that a subset of the operations are complete we can
 291            # try to chop it off from the rest and continue, but we only
 292            # do this if we've already been through the list once before
 293            # without any chopping and nothing has changed.
 294            for package_label in sorted(self.generated_operations):
 295                chopped = []
 296                dependencies = set()
 297                for operation in list(self.generated_operations[package_label]):
 298                    deps_satisfied = True
 299                    operation_dependencies = set()
 300                    for dep in operation._auto_deps:
 301                        # Temporarily resolve the settings dependency to
 302                        # prevent circular references. While keeping the
 303                        # dependency checks on the resolved model, add the
 304                        # settings dependencies.
 305                        original_dep = dep
 306                        dep, is_settings_dep = self._resolve_dependency(dep)
 307                        if dep[0] != package_label:
 308                            # External app dependency. See if it's not yet
 309                            # satisfied.
 310                            for other_operation in self.generated_operations.get(
 311                                dep[0], []
 312                            ):
 313                                if self.check_dependency(other_operation, dep):
 314                                    deps_satisfied = False
 315                                    break
 316                            if not deps_satisfied:
 317                                break
 318                            else:
 319                                if is_settings_dep:
 320                                    operation_dependencies.add(
 321                                        (original_dep[0], original_dep[1])
 322                                    )
 323                                elif dep[0] in self.migrations:
 324                                    operation_dependencies.add(
 325                                        (dep[0], self.migrations[dep[0]][-1].name)
 326                                    )
 327                                else:
 328                                    # If we can't find the other app, we add a
 329                                    # first/last dependency, but only if we've
 330                                    # already been through once and checked
 331                                    # everything.
 332                                    if chop_mode:
 333                                        # If the app already exists, we add a
 334                                        # dependency on the last migration, as
 335                                        # we don't know which migration
 336                                        # contains the target field. If it's
 337                                        # not yet migrated or has no
 338                                        # migrations, we use __first__.
 339                                        if graph and graph.leaf_nodes(dep[0]):
 340                                            operation_dependencies.add(
 341                                                graph.leaf_nodes(dep[0])[0]
 342                                            )
 343                                        else:
 344                                            operation_dependencies.add(
 345                                                (dep[0], "__first__")
 346                                            )
 347                                    else:
 348                                        deps_satisfied = False
 349                    if deps_satisfied:
 350                        chopped.append(operation)
 351                        dependencies.update(operation_dependencies)
 352                        del self.generated_operations[package_label][0]
 353                    else:
 354                        break
 355                # Make a migration! Well, only if there's stuff to put in it
 356                if dependencies or chopped:
 357                    if not self.generated_operations[package_label] or chop_mode:
 358                        subclass = type(
 359                            "Migration",
 360                            (Migration,),
 361                            {"operations": [], "dependencies": []},
 362                        )
 363                        instance = subclass(
 364                            "auto_%i"  # noqa: UP031
 365                            % (len(self.migrations.get(package_label, [])) + 1),
 366                            package_label,
 367                        )
 368                        instance.dependencies = list(dependencies)
 369                        instance.operations = chopped
 370                        instance.initial = package_label not in self.existing_packages
 371                        self.migrations.setdefault(package_label, []).append(instance)
 372                        chop_mode = False
 373                    else:
 374                        self.generated_operations[package_label] = (
 375                            chopped + self.generated_operations[package_label]
 376                        )
 377            new_num_ops = sum(len(x) for x in self.generated_operations.values())
 378            if new_num_ops == num_ops:
 379                if not chop_mode:
 380                    chop_mode = True
 381                else:
 382                    raise ValueError(
 383                        f"Cannot resolve operation dependencies: {self.generated_operations!r}"
 384                    )
 385            num_ops = new_num_ops
 386
 387    def _sort_migrations(self) -> None:
 388        """
 389        Reorder to make things possible. Reordering may be needed so FKs work
 390        nicely inside the same app.
 391        """
 392        for package_label, ops in sorted(self.generated_operations.items()):
 393            ts = TopologicalSorter()
 394            for op in ops:
 395                ts.add(op)
 396                for dep in op._auto_deps:
 397                    # Resolve intra-app dependencies to handle circular
 398                    # references involving a settings model.
 399                    dep = self._resolve_dependency(dep)[0]
 400                    if dep[0] != package_label:
 401                        continue
 402                    ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
 403            self.generated_operations[package_label] = list(ts.static_order())
 404
 405    def _optimize_migrations(self) -> None:
 406        # Add in internal dependencies among the migrations
 407        for package_label, migrations in self.migrations.items():
 408            for m1, m2 in zip(migrations, migrations[1:]):
 409                m2.dependencies.append((package_label, m1.name))
 410
 411        # De-dupe dependencies
 412        for migrations in self.migrations.values():
 413            for migration in migrations:
 414                migration.dependencies = list(set(migration.dependencies))
 415
 416        # Optimize migrations
 417        for package_label, migrations in self.migrations.items():
 418            for migration in migrations:
 419                migration.operations = MigrationOptimizer().optimize(
 420                    migration.operations, package_label
 421                )
 422
 423    def check_dependency(
 424        self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
 425    ) -> bool:
 426        """
 427        Return True if the given operation depends on the given dependency,
 428        False otherwise.
 429        """
 430        # Created model
 431        if dependency[2] is None and dependency[3] is True:
 432            return (
 433                isinstance(operation, operations.CreateModel)
 434                and operation.name_lower == dependency[1].lower()
 435            )
 436        # Created field
 437        elif dependency[2] is not None and dependency[3] is True:
 438            return (
 439                isinstance(operation, operations.CreateModel)
 440                and operation.name_lower == dependency[1].lower()
 441                and any(dependency[2] == x for x, y in operation.fields)
 442            ) or (
 443                isinstance(operation, operations.AddField)
 444                and operation.model_name_lower == dependency[1].lower()
 445                and operation.name_lower == dependency[2].lower()
 446            )
 447        # Removed field
 448        elif dependency[2] is not None and dependency[3] is False:
 449            return (
 450                isinstance(operation, operations.RemoveField)
 451                and operation.model_name_lower == dependency[1].lower()
 452                and operation.name_lower == dependency[2].lower()
 453            )
 454        # Removed model
 455        elif dependency[2] is None and dependency[3] is False:
 456            return (
 457                isinstance(operation, operations.DeleteModel)
 458                and operation.name_lower == dependency[1].lower()
 459            )
 460        # Field being altered
 461        elif dependency[2] is not None and dependency[3] == "alter":
 462            return (
 463                isinstance(operation, operations.AlterField)
 464                and operation.model_name_lower == dependency[1].lower()
 465                and operation.name_lower == dependency[2].lower()
 466            )
 467        # Unknown dependency. Raise an error.
 468        else:
 469            raise ValueError(f"Can't handle dependency {dependency!r}")
 470
 471    def add_operation(
 472        self,
 473        package_label: str,
 474        operation: Operation,
 475        dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
 476        beginning: bool = False,
 477    ) -> None:
 478        # Operation dependencies are 4-element tuples:
 479        # (package_label, model_name, field_name, create/delete as True/False or "alter")
 480        operation._auto_deps = dependencies or []  # type: ignore[attr-defined]
 481        if beginning:
 482            self.generated_operations.setdefault(package_label, []).insert(0, operation)
 483        else:
 484            self.generated_operations.setdefault(package_label, []).append(operation)
 485
 486    def generate_renamed_models(self) -> None:
 487        """
 488        Find any renamed models, generate the operations for them, and remove
 489        the old entry from the model lists. Must be run before other
 490        model-level generation.
 491        """
 492        self.renamed_models = {}
 493        self.renamed_models_rel = {}
 494        added_models = self.new_model_keys - self.old_model_keys
 495        for package_label, model_name in sorted(added_models):
 496            model_state = self.to_state.models[package_label, model_name]
 497            model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
 498
 499            removed_models = self.old_model_keys - self.new_model_keys
 500            for rem_package_label, rem_model_name in removed_models:
 501                if rem_package_label == package_label:
 502                    rem_model_state = self.from_state.models[
 503                        rem_package_label, rem_model_name
 504                    ]
 505                    rem_model_fields_def = self.only_relation_agnostic_fields(
 506                        rem_model_state.fields
 507                    )
 508                    if model_fields_def == rem_model_fields_def:
 509                        if self.questioner.ask_rename_model(
 510                            rem_model_state, model_state
 511                        ):
 512                            dependencies = []
 513                            fields = list(model_state.fields.values()) + [
 514                                field.remote_field
 515                                for relations in self.to_state.relations[
 516                                    package_label, model_name
 517                                ].values()
 518                                for field in relations.values()
 519                            ]
 520                            for field in fields:
 521                                if field.is_relation:
 522                                    dependencies.extend(
 523                                        self._get_dependencies_for_foreign_key(
 524                                            package_label,
 525                                            model_name,
 526                                            field,
 527                                            self.to_state,
 528                                        )
 529                                    )
 530                            self.add_operation(
 531                                package_label,
 532                                operations.RenameModel(
 533                                    old_name=rem_model_state.name,
 534                                    new_name=model_state.name,
 535                                ),
 536                                dependencies=dependencies,
 537                            )
 538                            self.renamed_models[package_label, model_name] = (
 539                                rem_model_name
 540                            )
 541                            renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
 542                            self.renamed_models_rel[renamed_models_rel_key] = (
 543                                f"{model_state.package_label}.{model_state.name_lower}"
 544                            )
 545                            self.old_model_keys.remove(
 546                                (rem_package_label, rem_model_name)
 547                            )
 548                            self.old_model_keys.add((package_label, model_name))
 549                            break
 550
 551    def generate_created_models(self) -> None:
 552        """
 553        Find all new models and make create
 554        operations for them as well as separate operations to create any
 555        foreign key or M2M relationships (these are optimized later, if
 556        possible).
 557
 558        Defer any model options that refer to collections of fields that might
 559        be deferred.
 560        """
 561        added_models = self.new_model_keys - self.old_model_keys
 562
 563        for package_label, model_name in added_models:
 564            model_state = self.to_state.models[package_label, model_name]
 565            # Gather related fields
 566            related_fields = {}
 567            primary_key_rel = None
 568            for field_name, field in model_state.fields.items():
 569                if field.remote_field:
 570                    if field.remote_field.model:
 571                        if field.primary_key:
 572                            primary_key_rel = field.remote_field.model
 573                        else:
 574                            related_fields[field_name] = field
 575                    if getattr(field.remote_field, "through", None):
 576                        related_fields[field_name] = field
 577
 578            # Are there indexes to defer?
 579            indexes = model_state.options.pop("indexes")
 580            constraints = model_state.options.pop("constraints")
 581            # Depend on the deletion of any possible proxy version of us
 582            dependencies = [
 583                (package_label, model_name, None, False),
 584            ]
 585            # Depend on all bases
 586            for base in model_state.bases:
 587                if isinstance(base, str) and "." in base:
 588                    base_package_label, base_name = base.split(".", 1)
 589                    dependencies.append((base_package_label, base_name, None, True))
 590                    # Depend on the removal of base fields if the new model has
 591                    # a field with the same name.
 592                    old_base_model_state = self.from_state.models.get(
 593                        (base_package_label, base_name)
 594                    )
 595                    new_base_model_state = self.to_state.models.get(
 596                        (base_package_label, base_name)
 597                    )
 598                    if old_base_model_state and new_base_model_state:
 599                        removed_base_fields = (
 600                            set(old_base_model_state.fields)
 601                            .difference(
 602                                new_base_model_state.fields,
 603                            )
 604                            .intersection(model_state.fields)
 605                        )
 606                        for removed_base_field in removed_base_fields:
 607                            dependencies.append(
 608                                (
 609                                    base_package_label,
 610                                    base_name,
 611                                    removed_base_field,
 612                                    False,
 613                                )
 614                            )
 615            # Depend on the other end of the primary key if it's a relation
 616            if primary_key_rel:
 617                dependencies.append(
 618                    resolve_relation(
 619                        primary_key_rel,
 620                        package_label,
 621                        model_name,
 622                    )
 623                    + (None, True)
 624                )
 625            # Generate creation operation
 626            self.add_operation(
 627                package_label,
 628                operations.CreateModel(
 629                    name=model_state.name,
 630                    fields=[
 631                        d
 632                        for d in model_state.fields.items()
 633                        if d[0] not in related_fields
 634                    ],
 635                    options=model_state.options,
 636                    bases=model_state.bases,
 637                ),
 638                dependencies=dependencies,
 639                beginning=True,
 640            )
 641
 642            # Generate operations for each related field
 643            for name, field in sorted(related_fields.items()):
 644                dependencies = self._get_dependencies_for_foreign_key(
 645                    package_label,
 646                    model_name,
 647                    field,
 648                    self.to_state,
 649                )
 650                # Depend on our own model being created
 651                dependencies.append((package_label, model_name, None, True))
 652                # Make operation
 653                self.add_operation(
 654                    package_label,
 655                    operations.AddField(
 656                        model_name=model_name,
 657                        name=name,
 658                        field=field,
 659                    ),
 660                    dependencies=list(set(dependencies)),
 661                )
 662
 663            related_dependencies = [
 664                (package_label, model_name, name, True)
 665                for name in sorted(related_fields)
 666            ]
 667            related_dependencies.append((package_label, model_name, None, True))
 668            for index in indexes:
 669                self.add_operation(
 670                    package_label,
 671                    operations.AddIndex(
 672                        model_name=model_name,
 673                        index=index,
 674                    ),
 675                    dependencies=related_dependencies,
 676                )
 677            for constraint in constraints:
 678                self.add_operation(
 679                    package_label,
 680                    operations.AddConstraint(
 681                        model_name=model_name,
 682                        constraint=constraint,
 683                    ),
 684                    dependencies=related_dependencies,
 685                )
 686
 687    def generate_deleted_models(self) -> None:
 688        """
 689        Find all deleted models and make delete
 690        operations for them as well as separate operations to delete any
 691        foreign key or M2M relationships (these are optimized later, if
 692        possible).
 693
 694        Also bring forward removal of any model options that refer to
 695        collections of fields - the inverse of generate_created_models().
 696        """
 697        deleted_models = self.old_model_keys - self.new_model_keys
 698
 699        for package_label, model_name in sorted(deleted_models):
 700            model_state = self.from_state.models[package_label, model_name]
 701            # Gather related fields
 702            related_fields = {}
 703            for field_name, field in model_state.fields.items():
 704                if field.remote_field:
 705                    if field.remote_field.model:
 706                        related_fields[field_name] = field
 707                    if getattr(field.remote_field, "through", None):
 708                        related_fields[field_name] = field
 709
 710            # Then remove each related field
 711            for name in sorted(related_fields):
 712                self.add_operation(
 713                    package_label,
 714                    operations.RemoveField(
 715                        model_name=model_name,
 716                        name=name,
 717                    ),
 718                )
 719            # Finally, remove the model.
 720            # This depends on both the removal/alteration of all incoming fields
 721            # and the removal of all its own related fields, and if it's
 722            # a through model the field that references it.
 723            dependencies = []
 724            relations = self.from_state.relations
 725            for (
 726                related_object_package_label,
 727                object_name,
 728            ), relation_related_fields in relations[package_label, model_name].items():
 729                for field_name, field in relation_related_fields.items():
 730                    dependencies.append(
 731                        (related_object_package_label, object_name, field_name, False),
 732                    )
 733                    if not field.many_to_many:
 734                        dependencies.append(
 735                            (
 736                                related_object_package_label,
 737                                object_name,
 738                                field_name,
 739                                "alter",
 740                            ),
 741                        )
 742
 743            for name in sorted(related_fields):
 744                dependencies.append((package_label, model_name, name, False))
 745            # We're referenced in another field's through=
 746            through_user = self.through_users.get(
 747                (package_label, model_state.name_lower)
 748            )
 749            if through_user:
 750                dependencies.append(
 751                    (through_user[0], through_user[1], through_user[2], False)
 752                )
 753            # Finally, make the operation, deduping any dependencies
 754            self.add_operation(
 755                package_label,
 756                operations.DeleteModel(
 757                    name=model_state.name,
 758                ),
 759                dependencies=list(set(dependencies)),
 760            )
 761
 762    def create_renamed_fields(self) -> None:
 763        """Work out renamed fields."""
 764        self.renamed_operations = []
 765        old_field_keys = self.old_field_keys.copy()
 766        for package_label, model_name, field_name in sorted(
 767            self.new_field_keys - old_field_keys
 768        ):
 769            old_model_name = self.renamed_models.get(
 770                (package_label, model_name), model_name
 771            )
 772            old_model_state = self.from_state.models[package_label, old_model_name]
 773            new_model_state = self.to_state.models[package_label, model_name]
 774            field = new_model_state.get_field(field_name)
 775            # Scan to see if this is actually a rename!
 776            field_dec = self.deep_deconstruct(field)
 777            for rem_package_label, rem_model_name, rem_field_name in sorted(
 778                old_field_keys - self.new_field_keys
 779            ):
 780                if rem_package_label == package_label and rem_model_name == model_name:
 781                    old_field = old_model_state.get_field(rem_field_name)
 782                    old_field_dec = self.deep_deconstruct(old_field)
 783                    if (
 784                        field.remote_field
 785                        and field.remote_field.model
 786                        and "to" in old_field_dec[2]
 787                    ):
 788                        old_rel_to = old_field_dec[2]["to"]
 789                        if old_rel_to in self.renamed_models_rel:
 790                            old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
 791                    old_field.set_attributes_from_name(rem_field_name)
 792                    old_db_column = old_field.get_attname_column()[1]
 793                    if old_field_dec == field_dec or (
 794                        # Was the field renamed and db_column equal to the
 795                        # old field's column added?
 796                        old_field_dec[0:2] == field_dec[0:2]
 797                        and dict(old_field_dec[2], db_column=old_db_column)
 798                        == field_dec[2]
 799                    ):
 800                        if self.questioner.ask_rename(
 801                            model_name, rem_field_name, field_name, field
 802                        ):
 803                            self.renamed_operations.append(
 804                                (
 805                                    rem_package_label,
 806                                    rem_model_name,
 807                                    old_field.db_column,
 808                                    rem_field_name,
 809                                    package_label,
 810                                    model_name,
 811                                    field,
 812                                    field_name,
 813                                )
 814                            )
 815                            old_field_keys.remove(
 816                                (rem_package_label, rem_model_name, rem_field_name)
 817                            )
 818                            old_field_keys.add((package_label, model_name, field_name))
 819                            self.renamed_fields[
 820                                package_label, model_name, field_name
 821                            ] = rem_field_name
 822                            break
 823
 824    def generate_renamed_fields(self) -> None:
 825        """Generate RenameField operations."""
 826        for (
 827            rem_package_label,
 828            rem_model_name,
 829            rem_db_column,
 830            rem_field_name,
 831            package_label,
 832            model_name,
 833            field,
 834            field_name,
 835        ) in self.renamed_operations:
 836            # A db_column mismatch requires a prior noop AlterField for the
 837            # subsequent RenameField to be a noop on attempts at preserving the
 838            # old name.
 839            if rem_db_column != field.db_column:
 840                altered_field = field.clone()
 841                altered_field.name = rem_field_name
 842                self.add_operation(
 843                    package_label,
 844                    operations.AlterField(
 845                        model_name=model_name,
 846                        name=rem_field_name,
 847                        field=altered_field,
 848                    ),
 849                )
 850            self.add_operation(
 851                package_label,
 852                operations.RenameField(
 853                    model_name=model_name,
 854                    old_name=rem_field_name,
 855                    new_name=field_name,
 856                ),
 857            )
 858            self.old_field_keys.remove(
 859                (rem_package_label, rem_model_name, rem_field_name)
 860            )
 861            self.old_field_keys.add((package_label, model_name, field_name))
 862
 863    def generate_added_fields(self) -> None:
 864        """Make AddField operations."""
 865        for package_label, model_name, field_name in sorted(
 866            self.new_field_keys - self.old_field_keys
 867        ):
 868            self._generate_added_field(package_label, model_name, field_name)
 869
 870    def _generate_added_field(
 871        self, package_label: str, model_name: str, field_name: str
 872    ) -> None:
 873        field = self.to_state.models[package_label, model_name].get_field(field_name)
 874        # Adding a field always depends at least on its removal.
 875        dependencies = [(package_label, model_name, field_name, False)]
 876        # Fields that are foreignkeys/m2ms depend on stuff.
 877        if field.remote_field and field.remote_field.model:
 878            dependencies.extend(
 879                self._get_dependencies_for_foreign_key(
 880                    package_label,
 881                    model_name,
 882                    field,
 883                    self.to_state,
 884                )
 885            )
 886        # You can't just add NOT NULL fields with no default or fields
 887        # which don't allow empty strings as default.
 888        time_fields = (DateField, DateTimeField, TimeField)
 889        preserve_default = (
 890            field.allow_null
 891            or field.has_default()
 892            or field.many_to_many
 893            or (not field.required and field.empty_strings_allowed)
 894            or (isinstance(field, time_fields) and field.auto_now)
 895        )
 896        if not preserve_default:
 897            field = field.clone()
 898            if isinstance(field, time_fields) and field.auto_now_add:
 899                field.default = self.questioner.ask_auto_now_add_addition(
 900                    field_name, model_name
 901                )
 902            else:
 903                field.default = self.questioner.ask_not_null_addition(
 904                    field_name, model_name
 905                )
 906        if (
 907            field.primary_key
 908            and field.default is not NOT_PROVIDED
 909            and callable(field.default)
 910        ):
 911            self.questioner.ask_unique_callable_default_addition(field_name, model_name)
 912        self.add_operation(
 913            package_label,
 914            operations.AddField(
 915                model_name=model_name,
 916                name=field_name,
 917                field=field,
 918                preserve_default=preserve_default,
 919            ),
 920            dependencies=dependencies,
 921        )
 922
 923    def generate_removed_fields(self) -> None:
 924        """Make RemoveField operations."""
 925        for package_label, model_name, field_name in sorted(
 926            self.old_field_keys - self.new_field_keys
 927        ):
 928            self._generate_removed_field(package_label, model_name, field_name)
 929
 930    def _generate_removed_field(
 931        self, package_label: str, model_name: str, field_name: str
 932    ) -> None:
 933        self.add_operation(
 934            package_label,
 935            operations.RemoveField(
 936                model_name=model_name,
 937                name=field_name,
 938            ),
 939        )
 940
 941    def generate_altered_fields(self) -> None:
 942        """
 943        Make AlterField operations, or possibly RemovedField/AddField if alter
 944        isn't possible.
 945        """
 946        for package_label, model_name, field_name in sorted(
 947            self.old_field_keys & self.new_field_keys
 948        ):
 949            # Did the field change?
 950            old_model_name = self.renamed_models.get(
 951                (package_label, model_name), model_name
 952            )
 953            old_field_name = self.renamed_fields.get(
 954                (package_label, model_name, field_name), field_name
 955            )
 956            old_field = self.from_state.models[package_label, old_model_name].get_field(
 957                old_field_name
 958            )
 959            new_field = self.to_state.models[package_label, model_name].get_field(
 960                field_name
 961            )
 962            dependencies = []
 963            # Implement any model renames on relations; these are handled by RenameModel
 964            # so we need to exclude them from the comparison
 965            if hasattr(new_field, "remote_field") and getattr(
 966                new_field.remote_field, "model", None
 967            ):
 968                rename_key = resolve_relation(
 969                    new_field.remote_field.model, package_label, model_name
 970                )
 971                if rename_key in self.renamed_models:
 972                    new_field.remote_field.model = old_field.remote_field.model
 973                # Handle ForeignKey which can only have a single to_field.
 974                remote_field_name = getattr(new_field.remote_field, "field_name", None)
 975                if remote_field_name:
 976                    to_field_rename_key = rename_key + (remote_field_name,)
 977                    if to_field_rename_key in self.renamed_fields:
 978                        # Repoint model name only
 979                        new_field.remote_field.model = old_field.remote_field.model
 980                dependencies.extend(
 981                    self._get_dependencies_for_foreign_key(
 982                        package_label,
 983                        model_name,
 984                        new_field,
 985                        self.to_state,
 986                    )
 987                )
 988            if hasattr(new_field, "remote_field") and getattr(
 989                new_field.remote_field, "through", None
 990            ):
 991                rename_key = resolve_relation(
 992                    new_field.remote_field.through, package_label, model_name
 993                )
 994                if rename_key in self.renamed_models:
 995                    new_field.remote_field.through = old_field.remote_field.through
 996            old_field_dec = self.deep_deconstruct(old_field)
 997            new_field_dec = self.deep_deconstruct(new_field)
 998            # If the field was confirmed to be renamed it means that only
 999            # db_column was allowed to change which generate_renamed_fields()
1000            # already accounts for by adding an AlterField operation.
1001            if old_field_dec != new_field_dec and old_field_name == field_name:
1002                both_m2m = old_field.many_to_many and new_field.many_to_many
1003                neither_m2m = not old_field.many_to_many and not new_field.many_to_many
1004                if both_m2m or neither_m2m:
1005                    # Either both fields are m2m or neither is
1006                    preserve_default = True
1007                    if (
1008                        old_field.allow_null
1009                        and not new_field.allow_null
1010                        and not new_field.has_default()
1011                        and not new_field.many_to_many
1012                    ):
1013                        field = new_field.clone()
1014                        new_default = self.questioner.ask_not_null_alteration(
1015                            field_name, model_name
1016                        )
1017                        if new_default is not NOT_PROVIDED:
1018                            field.default = new_default
1019                            preserve_default = False
1020                    else:
1021                        field = new_field
1022                    self.add_operation(
1023                        package_label,
1024                        operations.AlterField(
1025                            model_name=model_name,
1026                            name=field_name,
1027                            field=field,
1028                            preserve_default=preserve_default,
1029                        ),
1030                        dependencies=dependencies,
1031                    )
1032                else:
1033                    # We cannot alter between m2m and concrete fields
1034                    self._generate_removed_field(package_label, model_name, field_name)
1035                    self._generate_added_field(package_label, model_name, field_name)
1036
1037    def create_altered_indexes(self) -> None:
1038        option_name = operations.AddIndex.option_name
1039
1040        for package_label, model_name in sorted(self.kept_model_keys):
1041            old_model_name = self.renamed_models.get(
1042                (package_label, model_name), model_name
1043            )
1044            old_model_state = self.from_state.models[package_label, old_model_name]
1045            new_model_state = self.to_state.models[package_label, model_name]
1046
1047            old_indexes = old_model_state.options[option_name]
1048            new_indexes = new_model_state.options[option_name]
1049            added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1050            removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1051            renamed_indexes = []
1052            # Find renamed indexes.
1053            remove_from_added = []
1054            remove_from_removed = []
1055            for new_index in added_indexes:
1056                new_index_dec = new_index.deconstruct()
1057                new_index_name = new_index_dec[2].pop("name")
1058                for old_index in removed_indexes:
1059                    old_index_dec = old_index.deconstruct()
1060                    old_index_name = old_index_dec[2].pop("name")
1061                    # Indexes are the same except for the names.
1062                    if (
1063                        new_index_dec == old_index_dec
1064                        and new_index_name != old_index_name
1065                    ):
1066                        renamed_indexes.append((old_index_name, new_index_name, None))
1067                        remove_from_added.append(new_index)
1068                        remove_from_removed.append(old_index)
1069
1070            # Remove renamed indexes from the lists of added and removed
1071            # indexes.
1072            added_indexes = [
1073                idx for idx in added_indexes if idx not in remove_from_added
1074            ]
1075            removed_indexes = [
1076                idx for idx in removed_indexes if idx not in remove_from_removed
1077            ]
1078
1079            self.altered_indexes.update(
1080                {
1081                    (package_label, model_name): {
1082                        "added_indexes": added_indexes,
1083                        "removed_indexes": removed_indexes,
1084                        "renamed_indexes": renamed_indexes,
1085                    }
1086                }
1087            )
1088
1089    def generate_added_indexes(self) -> None:
1090        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1091            dependencies = self._get_dependencies_for_model(package_label, model_name)
1092            for index in alt_indexes["added_indexes"]:
1093                self.add_operation(
1094                    package_label,
1095                    operations.AddIndex(
1096                        model_name=model_name,
1097                        index=index,
1098                    ),
1099                    dependencies=dependencies,
1100                )
1101
1102    def generate_removed_indexes(self) -> None:
1103        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1104            for index in alt_indexes["removed_indexes"]:
1105                self.add_operation(
1106                    package_label,
1107                    operations.RemoveIndex(
1108                        model_name=model_name,
1109                        name=index.name,
1110                    ),
1111                )
1112
1113    def generate_renamed_indexes(self) -> None:
1114        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1115            for old_index_name, new_index_name, old_fields in alt_indexes[
1116                "renamed_indexes"
1117            ]:
1118                self.add_operation(
1119                    package_label,
1120                    operations.RenameIndex(
1121                        model_name=model_name,
1122                        new_name=new_index_name,
1123                        old_name=old_index_name,
1124                        old_fields=old_fields,
1125                    ),
1126                )
1127
1128    def create_altered_constraints(self) -> None:
1129        option_name = operations.AddConstraint.option_name
1130        for package_label, model_name in sorted(self.kept_model_keys):
1131            old_model_name = self.renamed_models.get(
1132                (package_label, model_name), model_name
1133            )
1134            old_model_state = self.from_state.models[package_label, old_model_name]
1135            new_model_state = self.to_state.models[package_label, model_name]
1136
1137            old_constraints = old_model_state.options[option_name]
1138            new_constraints = new_model_state.options[option_name]
1139            add_constraints = [c for c in new_constraints if c not in old_constraints]
1140            rem_constraints = [c for c in old_constraints if c not in new_constraints]
1141
1142            self.altered_constraints.update(
1143                {
1144                    (package_label, model_name): {
1145                        "added_constraints": add_constraints,
1146                        "removed_constraints": rem_constraints,
1147                    }
1148                }
1149            )
1150
1151    def generate_added_constraints(self) -> None:
1152        for (
1153            package_label,
1154            model_name,
1155        ), alt_constraints in self.altered_constraints.items():
1156            dependencies = self._get_dependencies_for_model(package_label, model_name)
1157            for constraint in alt_constraints["added_constraints"]:
1158                self.add_operation(
1159                    package_label,
1160                    operations.AddConstraint(
1161                        model_name=model_name,
1162                        constraint=constraint,
1163                    ),
1164                    dependencies=dependencies,
1165                )
1166
1167    def generate_removed_constraints(self) -> None:
1168        for (
1169            package_label,
1170            model_name,
1171        ), alt_constraints in self.altered_constraints.items():
1172            for constraint in alt_constraints["removed_constraints"]:
1173                self.add_operation(
1174                    package_label,
1175                    operations.RemoveConstraint(
1176                        model_name=model_name,
1177                        name=constraint.name,
1178                    ),
1179                )
1180
1181    @staticmethod
1182    def _get_dependencies_for_foreign_key(
1183        package_label: str, model_name: str, field: Field, project_state: ProjectState
1184    ) -> list[tuple[str, str, str | None, bool | str]]:
1185        remote_field_model = None
1186        if hasattr(field.remote_field, "model"):
1187            remote_field_model = field.remote_field.model
1188        else:
1189            relations = project_state.relations[package_label, model_name]
1190            for (remote_package_label, remote_model_name), fields in relations.items():
1191                if any(
1192                    field == related_field.remote_field
1193                    for related_field in fields.values()
1194                ):
1195                    remote_field_model = f"{remote_package_label}.{remote_model_name}"
1196                    break
1197        dep_package_label, dep_object_name = resolve_relation(
1198            remote_field_model,
1199            package_label,
1200            model_name,
1201        )
1202        dependencies = [(dep_package_label, dep_object_name, None, True)]
1203        if getattr(field.remote_field, "through", None):
1204            through_package_label, through_object_name = resolve_relation(
1205                field.remote_field.through,  # type: ignore[attr-defined]
1206                package_label,
1207                model_name,
1208            )
1209            dependencies.append(
1210                (through_package_label, through_object_name, None, True)
1211            )
1212        return dependencies
1213
1214    def _get_dependencies_for_model(
1215        self, package_label: str, model_name: str
1216    ) -> list[tuple[str, str, str | None, bool | str]]:
1217        """Return foreign key dependencies of the given model."""
1218        dependencies = []
1219        model_state = self.to_state.models[package_label, model_name]
1220        for field in model_state.fields.values():
1221            if field.is_relation:
1222                dependencies.extend(
1223                    self._get_dependencies_for_foreign_key(
1224                        package_label,
1225                        model_name,
1226                        field,
1227                        self.to_state,
1228                    )
1229                )
1230        return dependencies
1231
1232    def generate_altered_db_table(self) -> None:
1233        for package_label, model_name in sorted(self.kept_model_keys):
1234            old_model_name = self.renamed_models.get(
1235                (package_label, model_name), model_name
1236            )
1237            old_model_state = self.from_state.models[package_label, old_model_name]
1238            new_model_state = self.to_state.models[package_label, model_name]
1239            old_db_table_name = old_model_state.options.get("db_table")
1240            new_db_table_name = new_model_state.options.get("db_table")
1241            if old_db_table_name != new_db_table_name:
1242                self.add_operation(
1243                    package_label,
1244                    operations.AlterModelTable(
1245                        name=model_name,
1246                        table=new_db_table_name,
1247                    ),
1248                )
1249
1250    def generate_altered_db_table_comment(self) -> None:
1251        for package_label, model_name in sorted(self.kept_model_keys):
1252            old_model_name = self.renamed_models.get(
1253                (package_label, model_name), model_name
1254            )
1255            old_model_state = self.from_state.models[package_label, old_model_name]
1256            new_model_state = self.to_state.models[package_label, model_name]
1257
1258            old_db_table_comment = old_model_state.options.get("db_table_comment")
1259            new_db_table_comment = new_model_state.options.get("db_table_comment")
1260            if old_db_table_comment != new_db_table_comment:
1261                self.add_operation(
1262                    package_label,
1263                    operations.AlterModelTableComment(
1264                        name=model_name,
1265                        table_comment=new_db_table_comment,
1266                    ),
1267                )
1268
1269    def generate_altered_options(self) -> None:
1270        """
1271        Work out if any non-schema-affecting options have changed and make an
1272        operation to represent them in state changes (in case Python code in
1273        migrations needs them).
1274        """
1275        for package_label, model_name in sorted(self.kept_model_keys):
1276            old_model_name = self.renamed_models.get(
1277                (package_label, model_name), model_name
1278            )
1279            old_model_state = self.from_state.models[package_label, old_model_name]
1280            new_model_state = self.to_state.models[package_label, model_name]
1281            old_options = {
1282                key: value
1283                for key, value in old_model_state.options.items()
1284                if key in AlterModelOptions.ALTER_OPTION_KEYS
1285            }
1286            new_options = {
1287                key: value
1288                for key, value in new_model_state.options.items()
1289                if key in AlterModelOptions.ALTER_OPTION_KEYS
1290            }
1291            if old_options != new_options:
1292                self.add_operation(
1293                    package_label,
1294                    operations.AlterModelOptions(
1295                        name=model_name,
1296                        options=new_options,
1297                    ),
1298                )
1299
1300    def arrange_for_graph(
1301        self,
1302        changes: dict[str, list[Migration]],
1303        graph: MigrationGraph,
1304        migration_name: str | None = None,
1305    ) -> dict[str, list[Migration]]:
1306        """
1307        Take a result from changes() and a MigrationGraph, and fix the names
1308        and dependencies of the changes so they extend the graph from the leaf
1309        nodes for each app.
1310        """
1311        leaves = graph.leaf_nodes()
1312        name_map = {}
1313        for package_label, migrations in list(changes.items()):
1314            if not migrations:
1315                continue
1316            # Find the app label's current leaf node
1317            app_leaf = None
1318            for leaf in leaves:
1319                if leaf[0] == package_label:
1320                    app_leaf = leaf
1321                    break
1322            # Do they want an initial migration for this app?
1323            if app_leaf is None and not self.questioner.ask_initial(package_label):
1324                # They don't.
1325                for migration in migrations:
1326                    name_map[(package_label, migration.name)] = (
1327                        package_label,
1328                        "__first__",
1329                    )
1330                del changes[package_label]
1331                continue
1332            # Work out the next number in the sequence
1333            if app_leaf is None:
1334                next_number = 1
1335            else:
1336                next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1337            # Name each migration
1338            for i, migration in enumerate(migrations):
1339                if i == 0 and app_leaf:
1340                    migration.dependencies.append(app_leaf)
1341                new_name_parts = ["%04i" % next_number]  # noqa: UP031
1342                if migration_name:
1343                    new_name_parts.append(migration_name)
1344                elif i == 0 and not app_leaf:
1345                    new_name_parts.append("initial")
1346                else:
1347                    new_name_parts.append(migration.suggest_name()[:100])
1348                new_name = "_".join(new_name_parts)
1349                name_map[(package_label, migration.name)] = (package_label, new_name)
1350                next_number += 1
1351                migration.name = new_name
1352        # Now fix dependencies
1353        for migrations in changes.values():
1354            for migration in migrations:
1355                migration.dependencies = [
1356                    name_map.get(d, d) for d in migration.dependencies
1357                ]
1358        return changes
1359
1360    def _trim_to_packages(
1361        self, changes: dict[str, list[Migration]], package_labels: set[str]
1362    ) -> dict[str, list[Migration]]:
1363        """
1364        Take changes from arrange_for_graph() and set of app labels, and return
1365        a modified set of changes which trims out as many migrations that are
1366        not in package_labels as possible. Note that some other migrations may
1367        still be present as they may be required dependencies.
1368        """
1369        # Gather other app dependencies in a first pass
1370        app_dependencies = {}
1371        for package_label, migrations in changes.items():
1372            for migration in migrations:
1373                for dep_package_label, name in migration.dependencies:
1374                    app_dependencies.setdefault(package_label, set()).add(
1375                        dep_package_label
1376                    )
1377        required_packages = set(package_labels)
1378        # Keep resolving till there's no change
1379        old_required_packages = None
1380        while old_required_packages != required_packages:
1381            old_required_packages = set(required_packages)
1382            required_packages.update(
1383                *[
1384                    app_dependencies.get(package_label, ())
1385                    for package_label in required_packages
1386                ]
1387            )
1388        # Remove all migrations that aren't needed
1389        for package_label in list(changes):
1390            if package_label not in required_packages:
1391                del changes[package_label]
1392        return changes
1393
1394    @classmethod
1395    def parse_number(cls, name: str) -> int | None:
1396        """
1397        Given a migration name, try to extract a number from the beginning of
1398        it. For a squashed migration such as '0001_squashed_0004…', return the
1399        second number. If no number is found, return None.
1400        """
1401        if squashed_match := re.search(r".*_squashed_(\d+)", name):
1402            return int(squashed_match[1])
1403        match = re.match(r"^\d+", name)
1404        if match:
1405            return int(match[0])
1406        return None