Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import functools
   4import re
   5from graphlib import TopologicalSorter
   6from typing import TYPE_CHECKING, Any
   7
   8from plain import models
   9from plain.models.migrations import operations
  10from plain.models.migrations.migration import Migration, SettingsTuple
  11from plain.models.migrations.operations.models import AlterModelOptions
  12from plain.models.migrations.optimizer import MigrationOptimizer
  13from plain.models.migrations.questioner import MigrationQuestioner
  14from plain.models.migrations.utils import (
  15    COMPILED_REGEX_TYPE,
  16    RegexObject,
  17    resolve_relation,
  18)
  19from plain.runtime import settings
  20
  21if TYPE_CHECKING:
  22    from plain.models.fields import Field
  23    from plain.models.migrations.graph import MigrationGraph
  24    from plain.models.migrations.operations.base import Operation
  25    from plain.models.migrations.state import ProjectState
  26
  27
  28class MigrationAutodetector:
  29    """
  30    Take a pair of ProjectStates and compare them to see what the first would
  31    need doing to make it match the second (the second usually being the
  32    project's current state).
  33
  34    Note that this naturally operates on entire projects at a time,
  35    as it's likely that changes interact (for example, you can't
  36    add a ForeignKey without having a migration to add the table it
  37    depends on first). A user interface may offer single-app usage
  38    if it wishes, with the caveat that it may not always be possible.
  39    """
  40
  41    def __init__(
  42        self,
  43        from_state: ProjectState,
  44        to_state: ProjectState,
  45        questioner: MigrationQuestioner | None = None,
  46    ):
  47        self.from_state = from_state
  48        self.to_state = to_state
  49        self.questioner = questioner or MigrationQuestioner()
  50        self.existing_packages = {app for app, model in from_state.models}
  51
  52    def changes(
  53        self,
  54        graph: MigrationGraph,
  55        trim_to_packages: set[str] | None = None,
  56        convert_packages: set[str] | None = None,
  57        migration_name: str | None = None,
  58    ) -> dict[str, list[Migration]]:
  59        """
  60        Main entry point to produce a list of applicable changes.
  61        Take a graph to base names on and an optional set of packages
  62        to try and restrict to (restriction is not guaranteed)
  63        """
  64        changes = self._detect_changes(convert_packages, graph)
  65        changes = self.arrange_for_graph(changes, graph, migration_name)
  66        if trim_to_packages:
  67            changes = self._trim_to_packages(changes, trim_to_packages)
  68        return changes
  69
  70    def deep_deconstruct(self, obj: Any) -> Any:
  71        """
  72        Recursive deconstruction for a field and its arguments.
  73        Used for full comparison for rename/alter; sometimes a single-level
  74        deconstruction will not compare correctly.
  75        """
  76        if isinstance(obj, list):
  77            return [self.deep_deconstruct(value) for value in obj]
  78        elif isinstance(obj, tuple):
  79            return tuple(self.deep_deconstruct(value) for value in obj)
  80        elif isinstance(obj, dict):
  81            return {key: self.deep_deconstruct(value) for key, value in obj.items()}
  82        elif isinstance(obj, functools.partial):
  83            return (
  84                obj.func,
  85                self.deep_deconstruct(obj.args),
  86                self.deep_deconstruct(obj.keywords),
  87            )
  88        elif isinstance(obj, COMPILED_REGEX_TYPE):
  89            return RegexObject(obj)
  90        elif isinstance(obj, type):
  91            # If this is a type that implements 'deconstruct' as an instance method,
  92            # avoid treating this as being deconstructible itself - see #22951
  93            return obj
  94        elif hasattr(obj, "deconstruct"):
  95            deconstructed = obj.deconstruct()
  96            if isinstance(obj, models.Field):
  97                # we have a field which also returns a name
  98                deconstructed = deconstructed[1:]
  99            path, args, kwargs = deconstructed
 100            return (
 101                path,
 102                [self.deep_deconstruct(value) for value in args],
 103                {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
 104            )
 105        else:
 106            return obj
 107
 108    def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
 109        """
 110        Return a definition of the fields that ignores field names and
 111        what related fields actually relate to. Used for detecting renames (as
 112        the related fields change during renames).
 113        """
 114        fields_def = []
 115        for name, field in sorted(fields.items()):
 116            deconstruction = self.deep_deconstruct(field)
 117            if field.remote_field and field.remote_field.model:
 118                deconstruction[2].pop("to", None)
 119            fields_def.append(deconstruction)
 120        return fields_def
 121
 122    def _detect_changes(
 123        self,
 124        convert_packages: set[str] | None = None,
 125        graph: MigrationGraph | None = None,
 126    ) -> dict[str, list[Migration]]:
 127        """
 128        Return a dict of migration plans which will achieve the
 129        change from from_state to to_state. The dict has app labels
 130        as keys and a list of migrations as values.
 131
 132        The resulting migrations aren't specially named, but the names
 133        do matter for dependencies inside the set.
 134
 135        convert_packages is the list of packages to convert to use migrations
 136        (i.e. to make initial migrations for, in the usual case)
 137
 138        graph is an optional argument that, if provided, can help improve
 139        dependency generation and avoid potential circular dependencies.
 140        """
 141        # The first phase is generating all the operations for each app
 142        # and gathering them into a big per-app list.
 143        # Then go through that list, order it, and split into migrations to
 144        # resolve dependencies caused by M2Ms and FKs.
 145        self.generated_operations = {}
 146        self.altered_indexes = {}
 147        self.altered_constraints = {}
 148        self.renamed_fields = {}
 149
 150        # Prepare some old/new state and model lists, ignoring unmigrated packages.
 151        self.old_model_keys = set()
 152        self.new_model_keys = set()
 153        for (package_label, model_name), model_state in self.from_state.models.items():
 154            if package_label not in self.from_state.real_packages:
 155                self.old_model_keys.add((package_label, model_name))
 156
 157        for (package_label, model_name), model_state in self.to_state.models.items():
 158            if package_label not in self.from_state.real_packages or (
 159                convert_packages and package_label in convert_packages
 160            ):
 161                self.new_model_keys.add((package_label, model_name))
 162
 163        self.from_state.resolve_fields_and_relations()
 164        self.to_state.resolve_fields_and_relations()
 165
 166        # Renames have to come first
 167        self.generate_renamed_models()
 168
 169        # Prepare lists of fields and generate through model map
 170        self._prepare_field_lists()
 171        self._generate_through_model_map()
 172
 173        # Generate non-rename model operations
 174        self.generate_deleted_models()
 175        self.generate_created_models()
 176        self.generate_altered_options()
 177        self.generate_altered_db_table_comment()
 178
 179        # Create the renamed fields and store them in self.renamed_fields.
 180        # They are used by create_altered_indexes(), generate_altered_fields(),
 181        # generate_removed_altered_index(), and
 182        # generate_altered_index().
 183        self.create_renamed_fields()
 184        # Create the altered indexes and store them in self.altered_indexes.
 185        # This avoids the same computation in generate_removed_indexes()
 186        # and generate_added_indexes().
 187        self.create_altered_indexes()
 188        self.create_altered_constraints()
 189        # Generate index removal operations before field is removed
 190        self.generate_removed_constraints()
 191        self.generate_removed_indexes()
 192        # Generate field renaming operations.
 193        self.generate_renamed_fields()
 194        self.generate_renamed_indexes()
 195        # Generate field operations.
 196        self.generate_removed_fields()
 197        self.generate_added_fields()
 198        self.generate_altered_fields()
 199        self.generate_added_indexes()
 200        self.generate_added_constraints()
 201        self.generate_altered_db_table()
 202
 203        self._sort_migrations()
 204        self._build_migration_list(graph)
 205        self._optimize_migrations()
 206
 207        return self.migrations
 208
 209    def _prepare_field_lists(self) -> None:
 210        """
 211        Prepare field lists and a list of the fields that used through models
 212        in the old state so dependencies can be made from the through model
 213        deletion to the field that uses it.
 214        """
 215        self.kept_model_keys = self.old_model_keys & self.new_model_keys
 216        self.through_users = {}
 217        self.old_field_keys = {
 218            (package_label, model_name, field_name)
 219            for package_label, model_name in self.kept_model_keys
 220            for field_name in self.from_state.models[
 221                package_label,
 222                self.renamed_models.get((package_label, model_name), model_name),
 223            ].fields
 224        }
 225        self.new_field_keys = {
 226            (package_label, model_name, field_name)
 227            for package_label, model_name in self.kept_model_keys
 228            for field_name in self.to_state.models[package_label, model_name].fields
 229        }
 230
 231    def _generate_through_model_map(self) -> None:
 232        """Through model map generation."""
 233        for package_label, model_name in sorted(self.old_model_keys):
 234            old_model_name = self.renamed_models.get(
 235                (package_label, model_name), model_name
 236            )
 237            old_model_state = self.from_state.models[package_label, old_model_name]
 238            for field_name, field in old_model_state.fields.items():
 239                if hasattr(field, "remote_field") and getattr(
 240                    field.remote_field, "through", None
 241                ):
 242                    through_key = resolve_relation(
 243                        field.remote_field.through, package_label, model_name
 244                    )
 245                    self.through_users[through_key] = (
 246                        package_label,
 247                        old_model_name,
 248                        field_name,
 249                    )
 250
 251    @staticmethod
 252    def _resolve_dependency(
 253        dependency: tuple[str, ...],
 254    ) -> tuple[tuple[str, ...], bool]:
 255        """
 256        Return the resolved dependency and a boolean denoting whether or not
 257        it was a settings dependency.
 258        """
 259        if not isinstance(dependency, SettingsTuple):
 260            return dependency, False
 261        resolved_package_label, resolved_object_name = getattr(
 262            settings, dependency[1]
 263        ).split(".")
 264        return (resolved_package_label, resolved_object_name.lower()) + dependency[
 265            2:
 266        ], True
 267
 268    def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
 269        """
 270        Chop the lists of operations up into migrations with dependencies on
 271        each other. Do this by going through an app's list of operations until
 272        one is found that has an outgoing dependency that isn't in another
 273        app's migration yet (hasn't been chopped off its list). Then chop off
 274        the operations before it into a migration and move onto the next app.
 275        If the loops completes without doing anything, there's a circular
 276        dependency (which _should_ be impossible as the operations are
 277        all split at this point so they can't depend and be depended on).
 278        """
 279        self.migrations = {}
 280        num_ops = sum(len(x) for x in self.generated_operations.values())
 281        chop_mode = False
 282        while num_ops:
 283            # On every iteration, we step through all the packages and see if there
 284            # is a completed set of operations.
 285            # If we find that a subset of the operations are complete we can
 286            # try to chop it off from the rest and continue, but we only
 287            # do this if we've already been through the list once before
 288            # without any chopping and nothing has changed.
 289            for package_label in sorted(self.generated_operations):
 290                chopped = []
 291                dependencies = set()
 292                for operation in list(self.generated_operations[package_label]):
 293                    deps_satisfied = True
 294                    operation_dependencies = set()
 295                    for dep in operation._auto_deps:
 296                        # Temporarily resolve the settings dependency to
 297                        # prevent circular references. While keeping the
 298                        # dependency checks on the resolved model, add the
 299                        # settings dependencies.
 300                        original_dep = dep
 301                        dep, is_settings_dep = self._resolve_dependency(dep)
 302                        if dep[0] != package_label:
 303                            # External app dependency. See if it's not yet
 304                            # satisfied.
 305                            for other_operation in self.generated_operations.get(
 306                                dep[0], []
 307                            ):
 308                                if self.check_dependency(other_operation, dep):
 309                                    deps_satisfied = False
 310                                    break
 311                            if not deps_satisfied:
 312                                break
 313                            else:
 314                                if is_settings_dep:
 315                                    operation_dependencies.add(
 316                                        (original_dep[0], original_dep[1])
 317                                    )
 318                                elif dep[0] in self.migrations:
 319                                    operation_dependencies.add(
 320                                        (dep[0], self.migrations[dep[0]][-1].name)
 321                                    )
 322                                else:
 323                                    # If we can't find the other app, we add a
 324                                    # first/last dependency, but only if we've
 325                                    # already been through once and checked
 326                                    # everything.
 327                                    if chop_mode:
 328                                        # If the app already exists, we add a
 329                                        # dependency on the last migration, as
 330                                        # we don't know which migration
 331                                        # contains the target field. If it's
 332                                        # not yet migrated or has no
 333                                        # migrations, we use __first__.
 334                                        if graph and graph.leaf_nodes(dep[0]):
 335                                            operation_dependencies.add(
 336                                                graph.leaf_nodes(dep[0])[0]
 337                                            )
 338                                        else:
 339                                            operation_dependencies.add(
 340                                                (dep[0], "__first__")
 341                                            )
 342                                    else:
 343                                        deps_satisfied = False
 344                    if deps_satisfied:
 345                        chopped.append(operation)
 346                        dependencies.update(operation_dependencies)
 347                        del self.generated_operations[package_label][0]
 348                    else:
 349                        break
 350                # Make a migration! Well, only if there's stuff to put in it
 351                if dependencies or chopped:
 352                    if not self.generated_operations[package_label] or chop_mode:
 353                        subclass = type(
 354                            "Migration",
 355                            (Migration,),
 356                            {"operations": [], "dependencies": []},
 357                        )
 358                        instance = subclass(
 359                            "auto_%i"  # noqa: UP031
 360                            % (len(self.migrations.get(package_label, [])) + 1),
 361                            package_label,
 362                        )
 363                        instance.dependencies = list(dependencies)
 364                        instance.operations = chopped
 365                        instance.initial = package_label not in self.existing_packages
 366                        self.migrations.setdefault(package_label, []).append(instance)
 367                        chop_mode = False
 368                    else:
 369                        self.generated_operations[package_label] = (
 370                            chopped + self.generated_operations[package_label]
 371                        )
 372            new_num_ops = sum(len(x) for x in self.generated_operations.values())
 373            if new_num_ops == num_ops:
 374                if not chop_mode:
 375                    chop_mode = True
 376                else:
 377                    raise ValueError(
 378                        f"Cannot resolve operation dependencies: {self.generated_operations!r}"
 379                    )
 380            num_ops = new_num_ops
 381
 382    def _sort_migrations(self) -> None:
 383        """
 384        Reorder to make things possible. Reordering may be needed so FKs work
 385        nicely inside the same app.
 386        """
 387        for package_label, ops in sorted(self.generated_operations.items()):
 388            ts = TopologicalSorter()
 389            for op in ops:
 390                ts.add(op)
 391                for dep in op._auto_deps:
 392                    # Resolve intra-app dependencies to handle circular
 393                    # references involving a settings model.
 394                    dep = self._resolve_dependency(dep)[0]
 395                    if dep[0] != package_label:
 396                        continue
 397                    ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
 398            self.generated_operations[package_label] = list(ts.static_order())
 399
 400    def _optimize_migrations(self) -> None:
 401        # Add in internal dependencies among the migrations
 402        for package_label, migrations in self.migrations.items():
 403            for m1, m2 in zip(migrations, migrations[1:]):
 404                m2.dependencies.append((package_label, m1.name))
 405
 406        # De-dupe dependencies
 407        for migrations in self.migrations.values():
 408            for migration in migrations:
 409                migration.dependencies = list(set(migration.dependencies))
 410
 411        # Optimize migrations
 412        for package_label, migrations in self.migrations.items():
 413            for migration in migrations:
 414                migration.operations = MigrationOptimizer().optimize(
 415                    migration.operations, package_label
 416                )
 417
 418    def check_dependency(
 419        self, operation: Operation, dependency: tuple[str, ...]
 420    ) -> bool:
 421        """
 422        Return True if the given operation depends on the given dependency,
 423        False otherwise.
 424        """
 425        # Created model
 426        if dependency[2] is None and dependency[3] is True:
 427            return (
 428                isinstance(operation, operations.CreateModel)
 429                and operation.name_lower == dependency[1].lower()
 430            )
 431        # Created field
 432        elif dependency[2] is not None and dependency[3] is True:
 433            return (
 434                isinstance(operation, operations.CreateModel)
 435                and operation.name_lower == dependency[1].lower()
 436                and any(dependency[2] == x for x, y in operation.fields)
 437            ) or (
 438                isinstance(operation, operations.AddField)
 439                and operation.model_name_lower == dependency[1].lower()
 440                and operation.name_lower == dependency[2].lower()
 441            )
 442        # Removed field
 443        elif dependency[2] is not None and dependency[3] is False:
 444            return (
 445                isinstance(operation, operations.RemoveField)
 446                and operation.model_name_lower == dependency[1].lower()
 447                and operation.name_lower == dependency[2].lower()
 448            )
 449        # Removed model
 450        elif dependency[2] is None and dependency[3] is False:
 451            return (
 452                isinstance(operation, operations.DeleteModel)
 453                and operation.name_lower == dependency[1].lower()
 454            )
 455        # Field being altered
 456        elif dependency[2] is not None and dependency[3] == "alter":
 457            return (
 458                isinstance(operation, operations.AlterField)
 459                and operation.model_name_lower == dependency[1].lower()
 460                and operation.name_lower == dependency[2].lower()
 461            )
 462        # Unknown dependency. Raise an error.
 463        else:
 464            raise ValueError(f"Can't handle dependency {dependency!r}")
 465
 466    def add_operation(
 467        self,
 468        package_label: str,
 469        operation: Operation,
 470        dependencies: list[tuple[str, ...]] | None = None,
 471        beginning: bool = False,
 472    ) -> None:
 473        # Dependencies are
 474        # (package_label, model_name, field_name, create/delete as True/False)
 475        operation._auto_deps = dependencies or []  # type: ignore[attr-defined]
 476        if beginning:
 477            self.generated_operations.setdefault(package_label, []).insert(0, operation)
 478        else:
 479            self.generated_operations.setdefault(package_label, []).append(operation)
 480
 481    def generate_renamed_models(self) -> None:
 482        """
 483        Find any renamed models, generate the operations for them, and remove
 484        the old entry from the model lists. Must be run before other
 485        model-level generation.
 486        """
 487        self.renamed_models = {}
 488        self.renamed_models_rel = {}
 489        added_models = self.new_model_keys - self.old_model_keys
 490        for package_label, model_name in sorted(added_models):
 491            model_state = self.to_state.models[package_label, model_name]
 492            model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
 493
 494            removed_models = self.old_model_keys - self.new_model_keys
 495            for rem_package_label, rem_model_name in removed_models:
 496                if rem_package_label == package_label:
 497                    rem_model_state = self.from_state.models[
 498                        rem_package_label, rem_model_name
 499                    ]
 500                    rem_model_fields_def = self.only_relation_agnostic_fields(
 501                        rem_model_state.fields
 502                    )
 503                    if model_fields_def == rem_model_fields_def:
 504                        if self.questioner.ask_rename_model(
 505                            rem_model_state, model_state
 506                        ):
 507                            dependencies = []
 508                            fields = list(model_state.fields.values()) + [
 509                                field.remote_field
 510                                for relations in self.to_state.relations[
 511                                    package_label, model_name
 512                                ].values()
 513                                for field in relations.values()
 514                            ]
 515                            for field in fields:
 516                                if field.is_relation:
 517                                    dependencies.extend(
 518                                        self._get_dependencies_for_foreign_key(
 519                                            package_label,
 520                                            model_name,
 521                                            field,
 522                                            self.to_state,
 523                                        )
 524                                    )
 525                            self.add_operation(
 526                                package_label,
 527                                operations.RenameModel(
 528                                    old_name=rem_model_state.name,
 529                                    new_name=model_state.name,
 530                                ),
 531                                dependencies=dependencies,
 532                            )
 533                            self.renamed_models[package_label, model_name] = (
 534                                rem_model_name
 535                            )
 536                            renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
 537                            self.renamed_models_rel[renamed_models_rel_key] = (
 538                                f"{model_state.package_label}.{model_state.name_lower}"
 539                            )
 540                            self.old_model_keys.remove(
 541                                (rem_package_label, rem_model_name)
 542                            )
 543                            self.old_model_keys.add((package_label, model_name))
 544                            break
 545
 546    def generate_created_models(self) -> None:
 547        """
 548        Find all new models and make create
 549        operations for them as well as separate operations to create any
 550        foreign key or M2M relationships (these are optimized later, if
 551        possible).
 552
 553        Defer any model options that refer to collections of fields that might
 554        be deferred.
 555        """
 556        added_models = self.new_model_keys - self.old_model_keys
 557
 558        for package_label, model_name in added_models:
 559            model_state = self.to_state.models[package_label, model_name]
 560            # Gather related fields
 561            related_fields = {}
 562            primary_key_rel = None
 563            for field_name, field in model_state.fields.items():
 564                if field.remote_field:
 565                    if field.remote_field.model:
 566                        if field.primary_key:
 567                            primary_key_rel = field.remote_field.model
 568                        else:
 569                            related_fields[field_name] = field
 570                    if getattr(field.remote_field, "through", None):
 571                        related_fields[field_name] = field
 572
 573            # Are there indexes to defer?
 574            indexes = model_state.options.pop("indexes")
 575            constraints = model_state.options.pop("constraints")
 576            # Depend on the deletion of any possible proxy version of us
 577            dependencies = [
 578                (package_label, model_name, None, False),
 579            ]
 580            # Depend on all bases
 581            for base in model_state.bases:
 582                if isinstance(base, str) and "." in base:
 583                    base_package_label, base_name = base.split(".", 1)
 584                    dependencies.append((base_package_label, base_name, None, True))
 585                    # Depend on the removal of base fields if the new model has
 586                    # a field with the same name.
 587                    old_base_model_state = self.from_state.models.get(
 588                        (base_package_label, base_name)
 589                    )
 590                    new_base_model_state = self.to_state.models.get(
 591                        (base_package_label, base_name)
 592                    )
 593                    if old_base_model_state and new_base_model_state:
 594                        removed_base_fields = (
 595                            set(old_base_model_state.fields)
 596                            .difference(
 597                                new_base_model_state.fields,
 598                            )
 599                            .intersection(model_state.fields)
 600                        )
 601                        for removed_base_field in removed_base_fields:
 602                            dependencies.append(
 603                                (
 604                                    base_package_label,
 605                                    base_name,
 606                                    removed_base_field,
 607                                    False,
 608                                )
 609                            )
 610            # Depend on the other end of the primary key if it's a relation
 611            if primary_key_rel:
 612                dependencies.append(
 613                    resolve_relation(
 614                        primary_key_rel,
 615                        package_label,
 616                        model_name,
 617                    )
 618                    + (None, True)
 619                )
 620            # Generate creation operation
 621            self.add_operation(
 622                package_label,
 623                operations.CreateModel(
 624                    name=model_state.name,
 625                    fields=[
 626                        d
 627                        for d in model_state.fields.items()
 628                        if d[0] not in related_fields
 629                    ],
 630                    options=model_state.options,
 631                    bases=model_state.bases,
 632                ),
 633                dependencies=dependencies,
 634                beginning=True,
 635            )
 636
 637            # Generate operations for each related field
 638            for name, field in sorted(related_fields.items()):
 639                dependencies = self._get_dependencies_for_foreign_key(
 640                    package_label,
 641                    model_name,
 642                    field,
 643                    self.to_state,
 644                )
 645                # Depend on our own model being created
 646                dependencies.append((package_label, model_name, None, True))
 647                # Make operation
 648                self.add_operation(
 649                    package_label,
 650                    operations.AddField(
 651                        model_name=model_name,
 652                        name=name,
 653                        field=field,
 654                    ),
 655                    dependencies=list(set(dependencies)),
 656                )
 657
 658            related_dependencies = [
 659                (package_label, model_name, name, True)
 660                for name in sorted(related_fields)
 661            ]
 662            related_dependencies.append((package_label, model_name, None, True))
 663            for index in indexes:
 664                self.add_operation(
 665                    package_label,
 666                    operations.AddIndex(
 667                        model_name=model_name,
 668                        index=index,
 669                    ),
 670                    dependencies=related_dependencies,
 671                )
 672            for constraint in constraints:
 673                self.add_operation(
 674                    package_label,
 675                    operations.AddConstraint(
 676                        model_name=model_name,
 677                        constraint=constraint,
 678                    ),
 679                    dependencies=related_dependencies,
 680                )
 681
 682    def generate_deleted_models(self) -> None:
 683        """
 684        Find all deleted models and make delete
 685        operations for them as well as separate operations to delete any
 686        foreign key or M2M relationships (these are optimized later, if
 687        possible).
 688
 689        Also bring forward removal of any model options that refer to
 690        collections of fields - the inverse of generate_created_models().
 691        """
 692        deleted_models = self.old_model_keys - self.new_model_keys
 693
 694        for package_label, model_name in sorted(deleted_models):
 695            model_state = self.from_state.models[package_label, model_name]
 696            # Gather related fields
 697            related_fields = {}
 698            for field_name, field in model_state.fields.items():
 699                if field.remote_field:
 700                    if field.remote_field.model:
 701                        related_fields[field_name] = field
 702                    if getattr(field.remote_field, "through", None):
 703                        related_fields[field_name] = field
 704
 705            # Then remove each related field
 706            for name in sorted(related_fields):
 707                self.add_operation(
 708                    package_label,
 709                    operations.RemoveField(
 710                        model_name=model_name,
 711                        name=name,
 712                    ),
 713                )
 714            # Finally, remove the model.
 715            # This depends on both the removal/alteration of all incoming fields
 716            # and the removal of all its own related fields, and if it's
 717            # a through model the field that references it.
 718            dependencies = []
 719            relations = self.from_state.relations
 720            for (
 721                related_object_package_label,
 722                object_name,
 723            ), relation_related_fields in relations[package_label, model_name].items():
 724                for field_name, field in relation_related_fields.items():
 725                    dependencies.append(
 726                        (related_object_package_label, object_name, field_name, False),
 727                    )
 728                    if not field.many_to_many:
 729                        dependencies.append(
 730                            (
 731                                related_object_package_label,
 732                                object_name,
 733                                field_name,
 734                                "alter",
 735                            ),
 736                        )
 737
 738            for name in sorted(related_fields):
 739                dependencies.append((package_label, model_name, name, False))
 740            # We're referenced in another field's through=
 741            through_user = self.through_users.get(
 742                (package_label, model_state.name_lower)
 743            )
 744            if through_user:
 745                dependencies.append(
 746                    (through_user[0], through_user[1], through_user[2], False)
 747                )
 748            # Finally, make the operation, deduping any dependencies
 749            self.add_operation(
 750                package_label,
 751                operations.DeleteModel(
 752                    name=model_state.name,
 753                ),
 754                dependencies=list(set(dependencies)),
 755            )
 756
 757    def create_renamed_fields(self) -> None:
 758        """Work out renamed fields."""
 759        self.renamed_operations = []
 760        old_field_keys = self.old_field_keys.copy()
 761        for package_label, model_name, field_name in sorted(
 762            self.new_field_keys - old_field_keys
 763        ):
 764            old_model_name = self.renamed_models.get(
 765                (package_label, model_name), model_name
 766            )
 767            old_model_state = self.from_state.models[package_label, old_model_name]
 768            new_model_state = self.to_state.models[package_label, model_name]
 769            field = new_model_state.get_field(field_name)
 770            # Scan to see if this is actually a rename!
 771            field_dec = self.deep_deconstruct(field)
 772            for rem_package_label, rem_model_name, rem_field_name in sorted(
 773                old_field_keys - self.new_field_keys
 774            ):
 775                if rem_package_label == package_label and rem_model_name == model_name:
 776                    old_field = old_model_state.get_field(rem_field_name)
 777                    old_field_dec = self.deep_deconstruct(old_field)
 778                    if (
 779                        field.remote_field
 780                        and field.remote_field.model
 781                        and "to" in old_field_dec[2]
 782                    ):
 783                        old_rel_to = old_field_dec[2]["to"]
 784                        if old_rel_to in self.renamed_models_rel:
 785                            old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
 786                    old_field.set_attributes_from_name(rem_field_name)
 787                    old_db_column = old_field.get_attname_column()[1]
 788                    if old_field_dec == field_dec or (
 789                        # Was the field renamed and db_column equal to the
 790                        # old field's column added?
 791                        old_field_dec[0:2] == field_dec[0:2]
 792                        and dict(old_field_dec[2], db_column=old_db_column)
 793                        == field_dec[2]
 794                    ):
 795                        if self.questioner.ask_rename(
 796                            model_name, rem_field_name, field_name, field
 797                        ):
 798                            self.renamed_operations.append(
 799                                (
 800                                    rem_package_label,
 801                                    rem_model_name,
 802                                    old_field.db_column,
 803                                    rem_field_name,
 804                                    package_label,
 805                                    model_name,
 806                                    field,
 807                                    field_name,
 808                                )
 809                            )
 810                            old_field_keys.remove(
 811                                (rem_package_label, rem_model_name, rem_field_name)
 812                            )
 813                            old_field_keys.add((package_label, model_name, field_name))
 814                            self.renamed_fields[
 815                                package_label, model_name, field_name
 816                            ] = rem_field_name
 817                            break
 818
 819    def generate_renamed_fields(self) -> None:
 820        """Generate RenameField operations."""
 821        for (
 822            rem_package_label,
 823            rem_model_name,
 824            rem_db_column,
 825            rem_field_name,
 826            package_label,
 827            model_name,
 828            field,
 829            field_name,
 830        ) in self.renamed_operations:
 831            # A db_column mismatch requires a prior noop AlterField for the
 832            # subsequent RenameField to be a noop on attempts at preserving the
 833            # old name.
 834            if rem_db_column != field.db_column:
 835                altered_field = field.clone()
 836                altered_field.name = rem_field_name
 837                self.add_operation(
 838                    package_label,
 839                    operations.AlterField(
 840                        model_name=model_name,
 841                        name=rem_field_name,
 842                        field=altered_field,
 843                    ),
 844                )
 845            self.add_operation(
 846                package_label,
 847                operations.RenameField(
 848                    model_name=model_name,
 849                    old_name=rem_field_name,
 850                    new_name=field_name,
 851                ),
 852            )
 853            self.old_field_keys.remove(
 854                (rem_package_label, rem_model_name, rem_field_name)
 855            )
 856            self.old_field_keys.add((package_label, model_name, field_name))
 857
 858    def generate_added_fields(self) -> None:
 859        """Make AddField operations."""
 860        for package_label, model_name, field_name in sorted(
 861            self.new_field_keys - self.old_field_keys
 862        ):
 863            self._generate_added_field(package_label, model_name, field_name)
 864
 865    def _generate_added_field(
 866        self, package_label: str, model_name: str, field_name: str
 867    ) -> None:
 868        field = self.to_state.models[package_label, model_name].get_field(field_name)
 869        # Adding a field always depends at least on its removal.
 870        dependencies = [(package_label, model_name, field_name, False)]
 871        # Fields that are foreignkeys/m2ms depend on stuff.
 872        if field.remote_field and field.remote_field.model:
 873            dependencies.extend(
 874                self._get_dependencies_for_foreign_key(
 875                    package_label,
 876                    model_name,
 877                    field,
 878                    self.to_state,
 879                )
 880            )
 881        # You can't just add NOT NULL fields with no default or fields
 882        # which don't allow empty strings as default.
 883        time_fields = (models.DateField, models.DateTimeField, models.TimeField)
 884        preserve_default = (
 885            field.allow_null
 886            or field.has_default()
 887            or field.many_to_many
 888            or (not field.required and field.empty_strings_allowed)
 889            or (isinstance(field, time_fields) and field.auto_now)
 890        )
 891        if not preserve_default:
 892            field = field.clone()
 893            if isinstance(field, time_fields) and field.auto_now_add:
 894                field.default = self.questioner.ask_auto_now_add_addition(
 895                    field_name, model_name
 896                )
 897            else:
 898                field.default = self.questioner.ask_not_null_addition(
 899                    field_name, model_name
 900                )
 901        if (
 902            field.primary_key
 903            and field.default is not models.NOT_PROVIDED
 904            and callable(field.default)
 905        ):
 906            self.questioner.ask_unique_callable_default_addition(field_name, model_name)
 907        self.add_operation(
 908            package_label,
 909            operations.AddField(
 910                model_name=model_name,
 911                name=field_name,
 912                field=field,
 913                preserve_default=preserve_default,
 914            ),
 915            dependencies=dependencies,
 916        )
 917
 918    def generate_removed_fields(self) -> None:
 919        """Make RemoveField operations."""
 920        for package_label, model_name, field_name in sorted(
 921            self.old_field_keys - self.new_field_keys
 922        ):
 923            self._generate_removed_field(package_label, model_name, field_name)
 924
 925    def _generate_removed_field(
 926        self, package_label: str, model_name: str, field_name: str
 927    ) -> None:
 928        self.add_operation(
 929            package_label,
 930            operations.RemoveField(
 931                model_name=model_name,
 932                name=field_name,
 933            ),
 934        )
 935
 936    def generate_altered_fields(self) -> None:
 937        """
 938        Make AlterField operations, or possibly RemovedField/AddField if alter
 939        isn't possible.
 940        """
 941        for package_label, model_name, field_name in sorted(
 942            self.old_field_keys & self.new_field_keys
 943        ):
 944            # Did the field change?
 945            old_model_name = self.renamed_models.get(
 946                (package_label, model_name), model_name
 947            )
 948            old_field_name = self.renamed_fields.get(
 949                (package_label, model_name, field_name), field_name
 950            )
 951            old_field = self.from_state.models[package_label, old_model_name].get_field(
 952                old_field_name
 953            )
 954            new_field = self.to_state.models[package_label, model_name].get_field(
 955                field_name
 956            )
 957            dependencies = []
 958            # Implement any model renames on relations; these are handled by RenameModel
 959            # so we need to exclude them from the comparison
 960            if hasattr(new_field, "remote_field") and getattr(
 961                new_field.remote_field, "model", None
 962            ):
 963                rename_key = resolve_relation(
 964                    new_field.remote_field.model, package_label, model_name
 965                )
 966                if rename_key in self.renamed_models:
 967                    new_field.remote_field.model = old_field.remote_field.model
 968                # Handle ForeignKey which can only have a single to_field.
 969                remote_field_name = getattr(new_field.remote_field, "field_name", None)
 970                if remote_field_name:
 971                    to_field_rename_key = rename_key + (remote_field_name,)
 972                    if to_field_rename_key in self.renamed_fields:
 973                        # Repoint model name only
 974                        new_field.remote_field.model = old_field.remote_field.model
 975                dependencies.extend(
 976                    self._get_dependencies_for_foreign_key(
 977                        package_label,
 978                        model_name,
 979                        new_field,
 980                        self.to_state,
 981                    )
 982                )
 983            if hasattr(new_field, "remote_field") and getattr(
 984                new_field.remote_field, "through", None
 985            ):
 986                rename_key = resolve_relation(
 987                    new_field.remote_field.through, package_label, model_name
 988                )
 989                if rename_key in self.renamed_models:
 990                    new_field.remote_field.through = old_field.remote_field.through
 991            old_field_dec = self.deep_deconstruct(old_field)
 992            new_field_dec = self.deep_deconstruct(new_field)
 993            # If the field was confirmed to be renamed it means that only
 994            # db_column was allowed to change which generate_renamed_fields()
 995            # already accounts for by adding an AlterField operation.
 996            if old_field_dec != new_field_dec and old_field_name == field_name:
 997                both_m2m = old_field.many_to_many and new_field.many_to_many
 998                neither_m2m = not old_field.many_to_many and not new_field.many_to_many
 999                if both_m2m or neither_m2m:
1000                    # Either both fields are m2m or neither is
1001                    preserve_default = True
1002                    if (
1003                        old_field.allow_null
1004                        and not new_field.allow_null
1005                        and not new_field.has_default()
1006                        and not new_field.many_to_many
1007                    ):
1008                        field = new_field.clone()
1009                        new_default = self.questioner.ask_not_null_alteration(
1010                            field_name, model_name
1011                        )
1012                        if new_default is not models.NOT_PROVIDED:
1013                            field.default = new_default
1014                            preserve_default = False
1015                    else:
1016                        field = new_field
1017                    self.add_operation(
1018                        package_label,
1019                        operations.AlterField(
1020                            model_name=model_name,
1021                            name=field_name,
1022                            field=field,
1023                            preserve_default=preserve_default,
1024                        ),
1025                        dependencies=dependencies,
1026                    )
1027                else:
1028                    # We cannot alter between m2m and concrete fields
1029                    self._generate_removed_field(package_label, model_name, field_name)
1030                    self._generate_added_field(package_label, model_name, field_name)
1031
1032    def create_altered_indexes(self) -> None:
1033        option_name = operations.AddIndex.option_name
1034
1035        for package_label, model_name in sorted(self.kept_model_keys):
1036            old_model_name = self.renamed_models.get(
1037                (package_label, model_name), model_name
1038            )
1039            old_model_state = self.from_state.models[package_label, old_model_name]
1040            new_model_state = self.to_state.models[package_label, model_name]
1041
1042            old_indexes = old_model_state.options[option_name]
1043            new_indexes = new_model_state.options[option_name]
1044            added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1045            removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1046            renamed_indexes = []
1047            # Find renamed indexes.
1048            remove_from_added = []
1049            remove_from_removed = []
1050            for new_index in added_indexes:
1051                new_index_dec = new_index.deconstruct()
1052                new_index_name = new_index_dec[2].pop("name")
1053                for old_index in removed_indexes:
1054                    old_index_dec = old_index.deconstruct()
1055                    old_index_name = old_index_dec[2].pop("name")
1056                    # Indexes are the same except for the names.
1057                    if (
1058                        new_index_dec == old_index_dec
1059                        and new_index_name != old_index_name
1060                    ):
1061                        renamed_indexes.append((old_index_name, new_index_name, None))
1062                        remove_from_added.append(new_index)
1063                        remove_from_removed.append(old_index)
1064
1065            # Remove renamed indexes from the lists of added and removed
1066            # indexes.
1067            added_indexes = [
1068                idx for idx in added_indexes if idx not in remove_from_added
1069            ]
1070            removed_indexes = [
1071                idx for idx in removed_indexes if idx not in remove_from_removed
1072            ]
1073
1074            self.altered_indexes.update(
1075                {
1076                    (package_label, model_name): {
1077                        "added_indexes": added_indexes,
1078                        "removed_indexes": removed_indexes,
1079                        "renamed_indexes": renamed_indexes,
1080                    }
1081                }
1082            )
1083
1084    def generate_added_indexes(self) -> None:
1085        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1086            dependencies = self._get_dependencies_for_model(package_label, model_name)
1087            for index in alt_indexes["added_indexes"]:
1088                self.add_operation(
1089                    package_label,
1090                    operations.AddIndex(
1091                        model_name=model_name,
1092                        index=index,
1093                    ),
1094                    dependencies=dependencies,
1095                )
1096
1097    def generate_removed_indexes(self) -> None:
1098        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1099            for index in alt_indexes["removed_indexes"]:
1100                self.add_operation(
1101                    package_label,
1102                    operations.RemoveIndex(
1103                        model_name=model_name,
1104                        name=index.name,
1105                    ),
1106                )
1107
1108    def generate_renamed_indexes(self) -> None:
1109        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1110            for old_index_name, new_index_name, old_fields in alt_indexes[
1111                "renamed_indexes"
1112            ]:
1113                self.add_operation(
1114                    package_label,
1115                    operations.RenameIndex(
1116                        model_name=model_name,
1117                        new_name=new_index_name,
1118                        old_name=old_index_name,
1119                        old_fields=old_fields,
1120                    ),
1121                )
1122
1123    def create_altered_constraints(self) -> None:
1124        option_name = operations.AddConstraint.option_name
1125        for package_label, model_name in sorted(self.kept_model_keys):
1126            old_model_name = self.renamed_models.get(
1127                (package_label, model_name), model_name
1128            )
1129            old_model_state = self.from_state.models[package_label, old_model_name]
1130            new_model_state = self.to_state.models[package_label, model_name]
1131
1132            old_constraints = old_model_state.options[option_name]
1133            new_constraints = new_model_state.options[option_name]
1134            add_constraints = [c for c in new_constraints if c not in old_constraints]
1135            rem_constraints = [c for c in old_constraints if c not in new_constraints]
1136
1137            self.altered_constraints.update(
1138                {
1139                    (package_label, model_name): {
1140                        "added_constraints": add_constraints,
1141                        "removed_constraints": rem_constraints,
1142                    }
1143                }
1144            )
1145
1146    def generate_added_constraints(self) -> None:
1147        for (
1148            package_label,
1149            model_name,
1150        ), alt_constraints in self.altered_constraints.items():
1151            dependencies = self._get_dependencies_for_model(package_label, model_name)
1152            for constraint in alt_constraints["added_constraints"]:
1153                self.add_operation(
1154                    package_label,
1155                    operations.AddConstraint(
1156                        model_name=model_name,
1157                        constraint=constraint,
1158                    ),
1159                    dependencies=dependencies,
1160                )
1161
1162    def generate_removed_constraints(self) -> None:
1163        for (
1164            package_label,
1165            model_name,
1166        ), alt_constraints in self.altered_constraints.items():
1167            for constraint in alt_constraints["removed_constraints"]:
1168                self.add_operation(
1169                    package_label,
1170                    operations.RemoveConstraint(
1171                        model_name=model_name,
1172                        name=constraint.name,
1173                    ),
1174                )
1175
1176    @staticmethod
1177    def _get_dependencies_for_foreign_key(
1178        package_label: str, model_name: str, field: Field, project_state: ProjectState
1179    ) -> list[tuple[str, str, None, bool]]:
1180        remote_field_model = None
1181        if hasattr(field.remote_field, "model"):
1182            remote_field_model = field.remote_field.model
1183        else:
1184            relations = project_state.relations[package_label, model_name]
1185            for (remote_package_label, remote_model_name), fields in relations.items():
1186                if any(
1187                    field == related_field.remote_field
1188                    for related_field in fields.values()
1189                ):
1190                    remote_field_model = f"{remote_package_label}.{remote_model_name}"
1191                    break
1192        dep_package_label, dep_object_name = resolve_relation(
1193            remote_field_model,
1194            package_label,
1195            model_name,
1196        )
1197        dependencies = [(dep_package_label, dep_object_name, None, True)]
1198        if getattr(field.remote_field, "through", None):
1199            through_package_label, through_object_name = resolve_relation(
1200                field.remote_field.through,  # type: ignore[attr-defined]
1201                package_label,
1202                model_name,
1203            )
1204            dependencies.append(
1205                (through_package_label, through_object_name, None, True)
1206            )
1207        return dependencies
1208
1209    def _get_dependencies_for_model(
1210        self, package_label: str, model_name: str
1211    ) -> list[tuple[str, str, None, bool]]:
1212        """Return foreign key dependencies of the given model."""
1213        dependencies = []
1214        model_state = self.to_state.models[package_label, model_name]
1215        for field in model_state.fields.values():
1216            if field.is_relation:
1217                dependencies.extend(
1218                    self._get_dependencies_for_foreign_key(
1219                        package_label,
1220                        model_name,
1221                        field,
1222                        self.to_state,
1223                    )
1224                )
1225        return dependencies
1226
1227    def generate_altered_db_table(self) -> None:
1228        for package_label, model_name in sorted(self.kept_model_keys):
1229            old_model_name = self.renamed_models.get(
1230                (package_label, model_name), model_name
1231            )
1232            old_model_state = self.from_state.models[package_label, old_model_name]
1233            new_model_state = self.to_state.models[package_label, model_name]
1234            old_db_table_name = old_model_state.options.get("db_table")
1235            new_db_table_name = new_model_state.options.get("db_table")
1236            if old_db_table_name != new_db_table_name:
1237                self.add_operation(
1238                    package_label,
1239                    operations.AlterModelTable(
1240                        name=model_name,
1241                        table=new_db_table_name,
1242                    ),
1243                )
1244
1245    def generate_altered_db_table_comment(self) -> None:
1246        for package_label, model_name in sorted(self.kept_model_keys):
1247            old_model_name = self.renamed_models.get(
1248                (package_label, model_name), model_name
1249            )
1250            old_model_state = self.from_state.models[package_label, old_model_name]
1251            new_model_state = self.to_state.models[package_label, model_name]
1252
1253            old_db_table_comment = old_model_state.options.get("db_table_comment")
1254            new_db_table_comment = new_model_state.options.get("db_table_comment")
1255            if old_db_table_comment != new_db_table_comment:
1256                self.add_operation(
1257                    package_label,
1258                    operations.AlterModelTableComment(
1259                        name=model_name,
1260                        table_comment=new_db_table_comment,
1261                    ),
1262                )
1263
1264    def generate_altered_options(self) -> None:
1265        """
1266        Work out if any non-schema-affecting options have changed and make an
1267        operation to represent them in state changes (in case Python code in
1268        migrations needs them).
1269        """
1270        for package_label, model_name in sorted(self.kept_model_keys):
1271            old_model_name = self.renamed_models.get(
1272                (package_label, model_name), model_name
1273            )
1274            old_model_state = self.from_state.models[package_label, old_model_name]
1275            new_model_state = self.to_state.models[package_label, model_name]
1276            old_options = {
1277                key: value
1278                for key, value in old_model_state.options.items()
1279                if key in AlterModelOptions.ALTER_OPTION_KEYS
1280            }
1281            new_options = {
1282                key: value
1283                for key, value in new_model_state.options.items()
1284                if key in AlterModelOptions.ALTER_OPTION_KEYS
1285            }
1286            if old_options != new_options:
1287                self.add_operation(
1288                    package_label,
1289                    operations.AlterModelOptions(
1290                        name=model_name,
1291                        options=new_options,
1292                    ),
1293                )
1294
1295    def arrange_for_graph(
1296        self,
1297        changes: dict[str, list[Migration]],
1298        graph: MigrationGraph,
1299        migration_name: str | None = None,
1300    ) -> dict[str, list[Migration]]:
1301        """
1302        Take a result from changes() and a MigrationGraph, and fix the names
1303        and dependencies of the changes so they extend the graph from the leaf
1304        nodes for each app.
1305        """
1306        leaves = graph.leaf_nodes()
1307        name_map = {}
1308        for package_label, migrations in list(changes.items()):
1309            if not migrations:
1310                continue
1311            # Find the app label's current leaf node
1312            app_leaf = None
1313            for leaf in leaves:
1314                if leaf[0] == package_label:
1315                    app_leaf = leaf
1316                    break
1317            # Do they want an initial migration for this app?
1318            if app_leaf is None and not self.questioner.ask_initial(package_label):
1319                # They don't.
1320                for migration in migrations:
1321                    name_map[(package_label, migration.name)] = (
1322                        package_label,
1323                        "__first__",
1324                    )
1325                del changes[package_label]
1326                continue
1327            # Work out the next number in the sequence
1328            if app_leaf is None:
1329                next_number = 1
1330            else:
1331                next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1332            # Name each migration
1333            for i, migration in enumerate(migrations):
1334                if i == 0 and app_leaf:
1335                    migration.dependencies.append(app_leaf)
1336                new_name_parts = ["%04i" % next_number]  # noqa: UP031
1337                if migration_name:
1338                    new_name_parts.append(migration_name)
1339                elif i == 0 and not app_leaf:
1340                    new_name_parts.append("initial")
1341                else:
1342                    new_name_parts.append(migration.suggest_name()[:100])
1343                new_name = "_".join(new_name_parts)
1344                name_map[(package_label, migration.name)] = (package_label, new_name)
1345                next_number += 1
1346                migration.name = new_name
1347        # Now fix dependencies
1348        for migrations in changes.values():
1349            for migration in migrations:
1350                migration.dependencies = [
1351                    name_map.get(d, d) for d in migration.dependencies
1352                ]
1353        return changes
1354
1355    def _trim_to_packages(
1356        self, changes: dict[str, list[Migration]], package_labels: set[str]
1357    ) -> dict[str, list[Migration]]:
1358        """
1359        Take changes from arrange_for_graph() and set of app labels, and return
1360        a modified set of changes which trims out as many migrations that are
1361        not in package_labels as possible. Note that some other migrations may
1362        still be present as they may be required dependencies.
1363        """
1364        # Gather other app dependencies in a first pass
1365        app_dependencies = {}
1366        for package_label, migrations in changes.items():
1367            for migration in migrations:
1368                for dep_package_label, name in migration.dependencies:
1369                    app_dependencies.setdefault(package_label, set()).add(
1370                        dep_package_label
1371                    )
1372        required_packages = set(package_labels)
1373        # Keep resolving till there's no change
1374        old_required_packages = None
1375        while old_required_packages != required_packages:
1376            old_required_packages = set(required_packages)
1377            required_packages.update(
1378                *[
1379                    app_dependencies.get(package_label, ())
1380                    for package_label in required_packages
1381                ]
1382            )
1383        # Remove all migrations that aren't needed
1384        for package_label in list(changes):
1385            if package_label not in required_packages:
1386                del changes[package_label]
1387        return changes
1388
1389    @classmethod
1390    def parse_number(cls, name: str) -> int | None:
1391        """
1392        Given a migration name, try to extract a number from the beginning of
1393        it. For a squashed migration such as '0001_squashed_0004…', return the
1394        second number. If no number is found, return None.
1395        """
1396        if squashed_match := re.search(r".*_squashed_(\d+)", name):
1397            return int(squashed_match[1])
1398        match = re.match(r"^\d+", name)
1399        if match:
1400            return int(match[0])
1401        return None