1from __future__ import annotations
   2
   3import functools
   4import re
   5from graphlib import TopologicalSorter
   6from typing import TYPE_CHECKING, Any
   7
   8from plain.postgres.fields import (
   9    NOT_PROVIDED,
  10    DateField,
  11    DateTimeField,
  12    Field,
  13    TimeField,
  14)
  15from plain.postgres.fields.related import ManyToManyField, RelatedField
  16from plain.postgres.fields.reverse_related import ManyToManyRel
  17from plain.postgres.migrations import operations
  18from plain.postgres.migrations.migration import Migration, SettingsTuple
  19from plain.postgres.migrations.operations.models import AlterModelOptions
  20from plain.postgres.migrations.optimizer import MigrationOptimizer
  21from plain.postgres.migrations.questioner import MigrationQuestioner
  22from plain.postgres.migrations.utils import (
  23    COMPILED_REGEX_TYPE,
  24    RegexObject,
  25    resolve_relation,
  26)
  27from plain.runtime import settings
  28
  29if TYPE_CHECKING:
  30    from plain.postgres.migrations.graph import MigrationGraph
  31    from plain.postgres.migrations.operations.base import Operation
  32    from plain.postgres.migrations.state import ProjectState
  33
  34
  35class MigrationAutodetector:
  36    """
  37    Take a pair of ProjectStates and compare them to see what the first would
  38    need doing to make it match the second (the second usually being the
  39    project's current state).
  40
  41    Note that this naturally operates on entire projects at a time,
  42    as it's likely that changes interact (for example, you can't
  43    add a ForeignKeyField without having a migration to add the table it
  44    depends on first). A user interface may offer single-app usage
  45    if it wishes, with the caveat that it may not always be possible.
  46    """
  47
  48    def __init__(
  49        self,
  50        from_state: ProjectState,
  51        to_state: ProjectState,
  52        questioner: MigrationQuestioner | None = None,
  53    ):
  54        self.from_state = from_state
  55        self.to_state = to_state
  56        self.questioner = questioner or MigrationQuestioner()
  57        self.existing_packages = {app for app, model in from_state.models}
  58
  59    def changes(
  60        self,
  61        graph: MigrationGraph,
  62        trim_to_packages: set[str] | None = None,
  63        convert_packages: set[str] | None = None,
  64        migration_name: str | None = None,
  65    ) -> dict[str, list[Migration]]:
  66        """
  67        Main entry point to produce a list of applicable changes.
  68        Take a graph to base names on and an optional set of packages
  69        to try and restrict to (restriction is not guaranteed)
  70        """
  71        changes = self._detect_changes(convert_packages, graph)
  72        changes = self.arrange_for_graph(changes, graph, migration_name)
  73        if trim_to_packages:
  74            changes = self._trim_to_packages(changes, trim_to_packages)
  75        return changes
  76
  77    def deep_deconstruct(self, obj: Any) -> Any:
  78        """
  79        Recursive deconstruction for a field and its arguments.
  80        Used for full comparison for rename/alter; sometimes a single-level
  81        deconstruction will not compare correctly.
  82        """
  83        if isinstance(obj, list):
  84            return [self.deep_deconstruct(value) for value in obj]
  85        elif isinstance(obj, tuple):
  86            return tuple(self.deep_deconstruct(value) for value in obj)
  87        elif isinstance(obj, dict):
  88            return {key: self.deep_deconstruct(value) for key, value in obj.items()}
  89        elif isinstance(obj, functools.partial):
  90            return (
  91                obj.func,
  92                self.deep_deconstruct(obj.args),
  93                self.deep_deconstruct(obj.keywords),
  94            )
  95        elif isinstance(obj, COMPILED_REGEX_TYPE):
  96            return RegexObject(obj)
  97        elif isinstance(obj, type):
  98            # If this is a type that implements 'deconstruct' as an instance method,
  99            # avoid treating this as being deconstructible itself - see #22951
 100            return obj
 101        elif hasattr(obj, "deconstruct"):
 102            deconstructed = obj.deconstruct()
 103            if isinstance(obj, Field):
 104                # we have a field which also returns a name
 105                deconstructed = deconstructed[1:]
 106            path, args, kwargs = deconstructed
 107            return (
 108                path,
 109                [self.deep_deconstruct(value) for value in args],
 110                {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
 111            )
 112        else:
 113            return obj
 114
 115    def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
 116        """
 117        Return a definition of the fields that ignores field names and
 118        what related fields actually relate to. Used for detecting renames (as
 119        the related fields change during renames).
 120        """
 121        fields_def = []
 122        for name, field in sorted(fields.items()):
 123            deconstruction = self.deep_deconstruct(field)
 124            if isinstance(field, RelatedField) and field.remote_field.model:
 125                deconstruction[2].pop("to", None)
 126            fields_def.append(deconstruction)
 127        return fields_def
 128
 129    def _detect_changes(
 130        self,
 131        convert_packages: set[str] | None = None,
 132        graph: MigrationGraph | None = None,
 133    ) -> dict[str, list[Migration]]:
 134        """
 135        Return a dict of migration plans which will achieve the
 136        change from from_state to to_state. The dict has app labels
 137        as keys and a list of migrations as values.
 138
 139        The resulting migrations aren't specially named, but the names
 140        do matter for dependencies inside the set.
 141
 142        convert_packages is the list of packages to convert to use migrations
 143        (i.e. to make initial migrations for, in the usual case)
 144
 145        graph is an optional argument that, if provided, can help improve
 146        dependency generation and avoid potential circular dependencies.
 147        """
 148        # The first phase is generating all the operations for each app
 149        # and gathering them into a big per-app list.
 150        # Then go through that list, order it, and split into migrations to
 151        # resolve dependencies caused by M2Ms and FKs.
 152        self.generated_operations = {}
 153        self.altered_indexes = {}
 154        self.altered_constraints = {}
 155        self.renamed_fields = {}
 156
 157        # Prepare some old/new state and model lists, ignoring unmigrated packages.
 158        self.old_model_keys = set()
 159        self.new_model_keys = set()
 160        for (package_label, model_name), model_state in self.from_state.models.items():
 161            if package_label not in self.from_state.real_packages:
 162                self.old_model_keys.add((package_label, model_name))
 163
 164        for (package_label, model_name), model_state in self.to_state.models.items():
 165            if package_label not in self.from_state.real_packages or (
 166                convert_packages and package_label in convert_packages
 167            ):
 168                self.new_model_keys.add((package_label, model_name))
 169
 170        self.from_state.resolve_fields_and_relations()
 171        self.to_state.resolve_fields_and_relations()
 172
 173        # Renames have to come first
 174        self.generate_renamed_models()
 175
 176        # Prepare lists of fields and generate through model map
 177        self._prepare_field_lists()
 178        self._generate_through_model_map()
 179
 180        # Generate non-rename model operations
 181        self.generate_deleted_models()
 182        self.generate_created_models()
 183        self.generate_altered_options()
 184
 185        # Create the renamed fields and store them in self.renamed_fields.
 186        # They are used by create_altered_indexes(), generate_altered_fields(),
 187        # generate_removed_altered_index(), and
 188        # generate_altered_index().
 189        self.create_renamed_fields()
 190        # Create the altered indexes and store them in self.altered_indexes.
 191        # This avoids the same computation in generate_removed_indexes()
 192        # and generate_added_indexes().
 193        self.create_altered_indexes()
 194        self.create_altered_constraints()
 195        # Generate index removal operations before field is removed
 196        self.generate_removed_constraints()
 197        self.generate_removed_indexes()
 198        # Generate field renaming operations.
 199        self.generate_renamed_fields()
 200        self.generate_renamed_indexes()
 201        # Generate field operations.
 202        self.generate_removed_fields()
 203        self.generate_added_fields()
 204        self.generate_altered_fields()
 205        self.generate_added_indexes()
 206        self.generate_added_constraints()
 207        self.generate_altered_db_table()
 208
 209        self._sort_migrations()
 210        self._build_migration_list(graph)
 211        self._optimize_migrations()
 212
 213        return self.migrations
 214
 215    def _prepare_field_lists(self) -> None:
 216        """
 217        Prepare field lists and a list of the fields that used through models
 218        in the old state so dependencies can be made from the through model
 219        deletion to the field that uses it.
 220        """
 221        self.kept_model_keys = self.old_model_keys & self.new_model_keys
 222        self.through_users = {}
 223        self.old_field_keys = {
 224            (package_label, model_name, field_name)
 225            for package_label, model_name in self.kept_model_keys
 226            for field_name in self.from_state.models[
 227                package_label,
 228                self.renamed_models.get((package_label, model_name), model_name),
 229            ].fields
 230        }
 231        self.new_field_keys = {
 232            (package_label, model_name, field_name)
 233            for package_label, model_name in self.kept_model_keys
 234            for field_name in self.to_state.models[package_label, model_name].fields
 235        }
 236
 237    def _generate_through_model_map(self) -> None:
 238        """Through model map generation."""
 239        for package_label, model_name in sorted(self.old_model_keys):
 240            old_model_name = self.renamed_models.get(
 241                (package_label, model_name), model_name
 242            )
 243            old_model_state = self.from_state.models[package_label, old_model_name]
 244            for field_name, field in old_model_state.fields.items():
 245                if hasattr(field, "remote_field") and getattr(
 246                    field.remote_field, "through", None
 247                ):
 248                    through_key = resolve_relation(
 249                        field.remote_field.through,  # type: ignore[unresolved-attribute]
 250                        package_label,
 251                        model_name,
 252                    )
 253                    self.through_users[through_key] = (
 254                        package_label,
 255                        old_model_name,
 256                        field_name,
 257                    )
 258
 259    @staticmethod
 260    def _resolve_dependency(
 261        dependency: tuple[str, str, str | None, bool | str],
 262    ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
 263        """
 264        Return the resolved dependency and a boolean denoting whether or not
 265        it was a settings dependency.
 266        """
 267        if not isinstance(dependency, SettingsTuple):
 268            return dependency, False
 269        resolved_package_label, resolved_object_name = getattr(
 270            settings, dependency[1]
 271        ).split(".")
 272        return (resolved_package_label, resolved_object_name.lower()) + dependency[
 273            2:
 274        ], True
 275
 276    def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
 277        """
 278        Chop the lists of operations up into migrations with dependencies on
 279        each other. Do this by going through an app's list of operations until
 280        one is found that has an outgoing dependency that isn't in another
 281        app's migration yet (hasn't been chopped off its list). Then chop off
 282        the operations before it into a migration and move onto the next app.
 283        If the loops completes without doing anything, there's a circular
 284        dependency (which _should_ be impossible as the operations are
 285        all split at this point so they can't depend and be depended on).
 286        """
 287        self.migrations = {}
 288        num_ops = sum(len(x) for x in self.generated_operations.values())
 289        chop_mode = False
 290        while num_ops:
 291            # On every iteration, we step through all the packages and see if there
 292            # is a completed set of operations.
 293            # If we find that a subset of the operations are complete we can
 294            # try to chop it off from the rest and continue, but we only
 295            # do this if we've already been through the list once before
 296            # without any chopping and nothing has changed.
 297            for package_label in sorted(self.generated_operations):
 298                chopped = []
 299                dependencies = set()
 300                for operation in list(self.generated_operations[package_label]):
 301                    deps_satisfied = True
 302                    operation_dependencies = set()
 303                    for dep in operation._auto_deps:
 304                        # Temporarily resolve the settings dependency to
 305                        # prevent circular references. While keeping the
 306                        # dependency checks on the resolved model, add the
 307                        # settings dependencies.
 308                        original_dep = dep
 309                        dep, is_settings_dep = self._resolve_dependency(dep)
 310                        if dep[0] != package_label:
 311                            # External app dependency. See if it's not yet
 312                            # satisfied.
 313                            for other_operation in self.generated_operations.get(
 314                                dep[0], []
 315                            ):
 316                                if self.check_dependency(other_operation, dep):
 317                                    deps_satisfied = False
 318                                    break
 319                            if not deps_satisfied:
 320                                break
 321                            else:
 322                                if is_settings_dep:
 323                                    operation_dependencies.add(
 324                                        (original_dep[0], original_dep[1])
 325                                    )
 326                                elif dep[0] in self.migrations:
 327                                    operation_dependencies.add(
 328                                        (dep[0], self.migrations[dep[0]][-1].name)
 329                                    )
 330                                else:
 331                                    # If we can't find the other app, we add a
 332                                    # first/last dependency, but only if we've
 333                                    # already been through once and checked
 334                                    # everything.
 335                                    if chop_mode:
 336                                        # If the app already exists, we add a
 337                                        # dependency on the last migration, as
 338                                        # we don't know which migration
 339                                        # contains the target field. If it's
 340                                        # not yet migrated or has no
 341                                        # migrations, we use __first__.
 342                                        if graph and graph.leaf_nodes(dep[0]):
 343                                            operation_dependencies.add(
 344                                                graph.leaf_nodes(dep[0])[0]
 345                                            )
 346                                        else:
 347                                            operation_dependencies.add(
 348                                                (dep[0], "__first__")
 349                                            )
 350                                    else:
 351                                        deps_satisfied = False
 352                    if deps_satisfied:
 353                        chopped.append(operation)
 354                        dependencies.update(operation_dependencies)
 355                        del self.generated_operations[package_label][0]
 356                    else:
 357                        break
 358                # Make a migration! Well, only if there's stuff to put in it
 359                if dependencies or chopped:
 360                    if not self.generated_operations[package_label] or chop_mode:
 361                        subclass = type(
 362                            "Migration",
 363                            (Migration,),
 364                            {"operations": [], "dependencies": []},
 365                        )
 366                        instance = subclass(
 367                            "auto_%i"  # noqa: UP031
 368                            % (len(self.migrations.get(package_label, [])) + 1),
 369                            package_label,
 370                        )
 371                        instance.dependencies = list(dependencies)
 372                        instance.operations = chopped
 373                        instance.initial = package_label not in self.existing_packages
 374                        self.migrations.setdefault(package_label, []).append(instance)
 375                        chop_mode = False
 376                    else:
 377                        self.generated_operations[package_label] = (
 378                            chopped + self.generated_operations[package_label]
 379                        )
 380            new_num_ops = sum(len(x) for x in self.generated_operations.values())
 381            if new_num_ops == num_ops:
 382                if not chop_mode:
 383                    chop_mode = True
 384                else:
 385                    raise ValueError(
 386                        f"Cannot resolve operation dependencies: {self.generated_operations!r}"
 387                    )
 388            num_ops = new_num_ops
 389
 390    def _sort_migrations(self) -> None:
 391        """
 392        Reorder to make things possible. Reordering may be needed so FKs work
 393        nicely inside the same app.
 394        """
 395        for package_label, ops in sorted(self.generated_operations.items()):
 396            ts = TopologicalSorter()
 397            for op in ops:
 398                ts.add(op)
 399                for dep in op._auto_deps:
 400                    # Resolve intra-app dependencies to handle circular
 401                    # references involving a settings model.
 402                    dep = self._resolve_dependency(dep)[0]
 403                    if dep[0] != package_label:
 404                        continue
 405                    ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
 406            self.generated_operations[package_label] = list(ts.static_order())
 407
 408    def _optimize_migrations(self) -> None:
 409        # Add in internal dependencies among the migrations
 410        for package_label, migrations in self.migrations.items():
 411            for m1, m2 in zip(migrations, migrations[1:]):
 412                m2.dependencies.append((package_label, m1.name))
 413
 414        # De-dupe dependencies
 415        for migrations in self.migrations.values():
 416            for migration in migrations:
 417                migration.dependencies = list(set(migration.dependencies))
 418
 419        # Optimize migrations
 420        for package_label, migrations in self.migrations.items():
 421            for migration in migrations:
 422                migration.operations = MigrationOptimizer().optimize(
 423                    migration.operations, package_label
 424                )
 425
 426    def check_dependency(
 427        self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
 428    ) -> bool:
 429        """
 430        Return True if the given operation depends on the given dependency,
 431        False otherwise.
 432        """
 433        # Created model
 434        if dependency[2] is None and dependency[3] is True:
 435            return (
 436                isinstance(operation, operations.CreateModel)
 437                and operation.name_lower == dependency[1].lower()
 438            )
 439        # Created field
 440        elif dependency[2] is not None and dependency[3] is True:
 441            return (
 442                isinstance(operation, operations.CreateModel)
 443                and operation.name_lower == dependency[1].lower()
 444                and any(dependency[2] == x for x, y in operation.fields)
 445            ) or (
 446                isinstance(operation, operations.AddField)
 447                and operation.model_name_lower == dependency[1].lower()
 448                and operation.name_lower == dependency[2].lower()
 449            )
 450        # Removed field
 451        elif dependency[2] is not None and dependency[3] is False:
 452            return (
 453                isinstance(operation, operations.RemoveField)
 454                and operation.model_name_lower == dependency[1].lower()
 455                and operation.name_lower == dependency[2].lower()
 456            )
 457        # Removed model
 458        elif dependency[2] is None and dependency[3] is False:
 459            return (
 460                isinstance(operation, operations.DeleteModel)
 461                and operation.name_lower == dependency[1].lower()
 462            )
 463        # Field being altered
 464        elif dependency[2] is not None and dependency[3] == "alter":
 465            return (
 466                isinstance(operation, operations.AlterField)
 467                and operation.model_name_lower == dependency[1].lower()
 468                and operation.name_lower == dependency[2].lower()
 469            )
 470        # Unknown dependency. Raise an error.
 471        else:
 472            raise ValueError(f"Can't handle dependency {dependency!r}")
 473
 474    def add_operation(
 475        self,
 476        package_label: str,
 477        operation: Operation,
 478        dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
 479        beginning: bool = False,
 480    ) -> None:
 481        # Operation dependencies are 4-element tuples:
 482        # (package_label, model_name, field_name, create/delete as True/False or "alter")
 483        operation._auto_deps = dependencies or []
 484        if beginning:
 485            self.generated_operations.setdefault(package_label, []).insert(0, operation)
 486        else:
 487            self.generated_operations.setdefault(package_label, []).append(operation)
 488
 489    def generate_renamed_models(self) -> None:
 490        """
 491        Find any renamed models, generate the operations for them, and remove
 492        the old entry from the model lists. Must be run before other
 493        model-level generation.
 494        """
 495        self.renamed_models = {}
 496        self.renamed_models_rel = {}
 497        added_models = self.new_model_keys - self.old_model_keys
 498        for package_label, model_name in sorted(added_models):
 499            model_state = self.to_state.models[package_label, model_name]
 500            model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
 501
 502            removed_models = self.old_model_keys - self.new_model_keys
 503            for rem_package_label, rem_model_name in removed_models:
 504                if rem_package_label == package_label:
 505                    rem_model_state = self.from_state.models[
 506                        rem_package_label, rem_model_name
 507                    ]
 508                    rem_model_fields_def = self.only_relation_agnostic_fields(
 509                        rem_model_state.fields
 510                    )
 511                    if model_fields_def == rem_model_fields_def:
 512                        if self.questioner.ask_rename_model(
 513                            rem_model_state, model_state
 514                        ):
 515                            dependencies = []
 516                            fields = list(model_state.fields.values()) + [
 517                                field.remote_field
 518                                for relations in self.to_state.relations[
 519                                    package_label, model_name
 520                                ].values()
 521                                for field in relations.values()
 522                                if isinstance(field, RelatedField)
 523                            ]
 524                            for field in fields:
 525                                if isinstance(field, RelatedField):
 526                                    dependencies.extend(
 527                                        self._get_dependencies_for_foreign_key(
 528                                            package_label,
 529                                            model_name,
 530                                            field,
 531                                            self.to_state,
 532                                        )
 533                                    )
 534                            self.add_operation(
 535                                package_label,
 536                                operations.RenameModel(
 537                                    old_name=rem_model_state.name,
 538                                    new_name=model_state.name,
 539                                ),
 540                                dependencies=dependencies,
 541                            )
 542                            self.renamed_models[package_label, model_name] = (
 543                                rem_model_name
 544                            )
 545                            renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
 546                            self.renamed_models_rel[renamed_models_rel_key] = (
 547                                f"{model_state.package_label}.{model_state.name_lower}"
 548                            )
 549                            self.old_model_keys.remove(
 550                                (rem_package_label, rem_model_name)
 551                            )
 552                            self.old_model_keys.add((package_label, model_name))
 553                            break
 554
 555    def generate_created_models(self) -> None:
 556        """
 557        Find all new models and make create
 558        operations for them as well as separate operations to create any
 559        foreign key or M2M relationships (these are optimized later, if
 560        possible).
 561
 562        Defer any model options that refer to collections of fields that might
 563        be deferred.
 564        """
 565        added_models = self.new_model_keys - self.old_model_keys
 566
 567        for package_label, model_name in added_models:
 568            model_state = self.to_state.models[package_label, model_name]
 569            # Gather related fields
 570            related_fields = {}
 571            primary_key_rel = None
 572            for field_name, field in model_state.fields.items():
 573                if isinstance(field, RelatedField):
 574                    if field.remote_field.model:
 575                        if field.primary_key:
 576                            primary_key_rel = field.remote_field.model
 577                        else:
 578                            related_fields[field_name] = field
 579                    if isinstance(field.remote_field, ManyToManyRel):
 580                        related_fields[field_name] = field
 581
 582            # Are there indexes to defer?
 583            indexes = model_state.options.pop("indexes")
 584            constraints = model_state.options.pop("constraints")
 585            # Depend on the deletion of any possible proxy version of us
 586            dependencies: list[tuple[str, str, str | None, bool | str]] = [
 587                (package_label, model_name, None, False),
 588            ]
 589            # Depend on all bases
 590            for base in model_state.bases:
 591                if isinstance(base, str) and "." in base:
 592                    base_package_label, base_name = base.split(".", 1)
 593                    dependencies.append((base_package_label, base_name, None, True))
 594                    # Depend on the removal of base fields if the new model has
 595                    # a field with the same name.
 596                    old_base_model_state = self.from_state.models.get(
 597                        (base_package_label, base_name)
 598                    )
 599                    new_base_model_state = self.to_state.models.get(
 600                        (base_package_label, base_name)
 601                    )
 602                    if old_base_model_state and new_base_model_state:
 603                        removed_base_fields = (
 604                            set(old_base_model_state.fields)
 605                            .difference(
 606                                new_base_model_state.fields,
 607                            )
 608                            .intersection(model_state.fields)
 609                        )
 610                        for removed_base_field in removed_base_fields:
 611                            dependencies.append(
 612                                (
 613                                    base_package_label,
 614                                    base_name,
 615                                    removed_base_field,
 616                                    False,
 617                                )
 618                            )
 619            # Depend on the other end of the primary key if it's a relation
 620            if primary_key_rel:
 621                dep_pl, dep_mn = resolve_relation(
 622                    primary_key_rel,
 623                    package_label,
 624                    model_name,
 625                )
 626                dependencies.append((dep_pl, dep_mn, None, True))
 627            # Generate creation operation
 628            self.add_operation(
 629                package_label,
 630                operations.CreateModel(
 631                    name=model_state.name,
 632                    fields=[
 633                        d
 634                        for d in model_state.fields.items()
 635                        if d[0] not in related_fields
 636                    ],
 637                    options=model_state.options,
 638                    bases=model_state.bases,
 639                ),
 640                dependencies=dependencies,
 641                beginning=True,
 642            )
 643
 644            # Generate operations for each related field
 645            for name, field in sorted(related_fields.items()):
 646                dependencies = self._get_dependencies_for_foreign_key(
 647                    package_label,
 648                    model_name,
 649                    field,
 650                    self.to_state,
 651                )
 652                # Depend on our own model being created
 653                dependencies.append((package_label, model_name, None, True))
 654                # Make operation
 655                self.add_operation(
 656                    package_label,
 657                    operations.AddField(
 658                        model_name=model_name,
 659                        name=name,
 660                        field=field,
 661                    ),
 662                    dependencies=list(set(dependencies)),
 663                )
 664
 665            related_dependencies: list[tuple[str, str, str | None, bool | str]] = [
 666                (package_label, model_name, name, True)
 667                for name in sorted(related_fields)
 668            ]
 669            related_dependencies.append((package_label, model_name, None, True))
 670            for index in indexes:
 671                self.add_operation(
 672                    package_label,
 673                    operations.AddIndex(
 674                        model_name=model_name,
 675                        index=index,
 676                    ),
 677                    dependencies=related_dependencies,
 678                )
 679            for constraint in constraints:
 680                self.add_operation(
 681                    package_label,
 682                    operations.AddConstraint(
 683                        model_name=model_name,
 684                        constraint=constraint,
 685                    ),
 686                    dependencies=related_dependencies,
 687                )
 688
 689    def generate_deleted_models(self) -> None:
 690        """
 691        Find all deleted models and make delete
 692        operations for them as well as separate operations to delete any
 693        foreign key or M2M relationships (these are optimized later, if
 694        possible).
 695
 696        Also bring forward removal of any model options that refer to
 697        collections of fields - the inverse of generate_created_models().
 698        """
 699        deleted_models = self.old_model_keys - self.new_model_keys
 700
 701        for package_label, model_name in sorted(deleted_models):
 702            model_state = self.from_state.models[package_label, model_name]
 703            # Gather related fields
 704            related_fields = {}
 705            for field_name, field in model_state.fields.items():
 706                if isinstance(field, RelatedField):
 707                    if field.remote_field.model:
 708                        related_fields[field_name] = field
 709                    if isinstance(field.remote_field, ManyToManyRel):
 710                        related_fields[field_name] = field
 711
 712            # Then remove each related field
 713            for name in sorted(related_fields):
 714                self.add_operation(
 715                    package_label,
 716                    operations.RemoveField(
 717                        model_name=model_name,
 718                        name=name,
 719                    ),
 720                )
 721            # Finally, remove the model.
 722            # This depends on both the removal/alteration of all incoming fields
 723            # and the removal of all its own related fields, and if it's
 724            # a through model the field that references it.
 725            dependencies = []
 726            relations = self.from_state.relations
 727            for (
 728                related_object_package_label,
 729                object_name,
 730            ), relation_related_fields in relations[package_label, model_name].items():
 731                for field_name, field in relation_related_fields.items():
 732                    dependencies.append(
 733                        (related_object_package_label, object_name, field_name, False),
 734                    )
 735                    if not isinstance(field, ManyToManyField):
 736                        dependencies.append(
 737                            (
 738                                related_object_package_label,
 739                                object_name,
 740                                field_name,
 741                                "alter",
 742                            ),
 743                        )
 744
 745            for name in sorted(related_fields):
 746                dependencies.append((package_label, model_name, name, False))
 747            # We're referenced in another field's through=
 748            through_user = self.through_users.get(
 749                (package_label, model_state.name_lower)
 750            )
 751            if through_user:
 752                dependencies.append(
 753                    (through_user[0], through_user[1], through_user[2], False)
 754                )
 755            # Finally, make the operation, deduping any dependencies
 756            self.add_operation(
 757                package_label,
 758                operations.DeleteModel(
 759                    name=model_state.name,
 760                ),
 761                dependencies=list(set(dependencies)),
 762            )
 763
 764    def create_renamed_fields(self) -> None:
 765        """Work out renamed fields."""
 766        self.renamed_operations = []
 767        old_field_keys = self.old_field_keys.copy()
 768        for package_label, model_name, field_name in sorted(
 769            self.new_field_keys - old_field_keys
 770        ):
 771            old_model_name = self.renamed_models.get(
 772                (package_label, model_name), model_name
 773            )
 774            old_model_state = self.from_state.models[package_label, old_model_name]
 775            new_model_state = self.to_state.models[package_label, model_name]
 776            field = new_model_state.get_field(field_name)
 777            # Scan to see if this is actually a rename!
 778            field_dec = self.deep_deconstruct(field)
 779            for rem_package_label, rem_model_name, rem_field_name in sorted(
 780                old_field_keys - self.new_field_keys
 781            ):
 782                if rem_package_label == package_label and rem_model_name == model_name:
 783                    old_field = old_model_state.get_field(rem_field_name)
 784                    old_field_dec = self.deep_deconstruct(old_field)
 785                    if (
 786                        isinstance(field, RelatedField)
 787                        and field.remote_field
 788                        and field.remote_field.model
 789                        and "to" in old_field_dec[2]
 790                    ):
 791                        old_rel_to = old_field_dec[2]["to"]
 792                        if old_rel_to in self.renamed_models_rel:
 793                            old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
 794                    old_field.set_attributes_from_name(rem_field_name)
 795                    if old_field_dec == field_dec:
 796                        if self.questioner.ask_rename(
 797                            model_name, rem_field_name, field_name, field
 798                        ):
 799                            self.renamed_operations.append(
 800                                (
 801                                    rem_package_label,
 802                                    rem_model_name,
 803                                    rem_field_name,
 804                                    package_label,
 805                                    model_name,
 806                                    field,
 807                                    field_name,
 808                                )
 809                            )
 810                            old_field_keys.remove(
 811                                (rem_package_label, rem_model_name, rem_field_name)
 812                            )
 813                            old_field_keys.add((package_label, model_name, field_name))
 814                            self.renamed_fields[
 815                                package_label, model_name, field_name
 816                            ] = rem_field_name
 817                            break
 818
 819    def generate_renamed_fields(self) -> None:
 820        """Generate RenameField operations."""
 821        for (
 822            rem_package_label,
 823            rem_model_name,
 824            rem_field_name,
 825            package_label,
 826            model_name,
 827            field,
 828            field_name,
 829        ) in self.renamed_operations:
 830            self.add_operation(
 831                package_label,
 832                operations.RenameField(
 833                    model_name=model_name,
 834                    old_name=rem_field_name,
 835                    new_name=field_name,
 836                ),
 837            )
 838            self.old_field_keys.remove(
 839                (rem_package_label, rem_model_name, rem_field_name)
 840            )
 841            self.old_field_keys.add((package_label, model_name, field_name))
 842
 843    def generate_added_fields(self) -> None:
 844        """Make AddField operations."""
 845        for package_label, model_name, field_name in sorted(
 846            self.new_field_keys - self.old_field_keys
 847        ):
 848            self._generate_added_field(package_label, model_name, field_name)
 849
 850    def _generate_added_field(
 851        self, package_label: str, model_name: str, field_name: str
 852    ) -> None:
 853        field = self.to_state.models[package_label, model_name].get_field(field_name)
 854        # Adding a field always depends at least on its removal.
 855        dependencies: list[tuple[str, str, str | None, bool | str]] = [
 856            (package_label, model_name, field_name, False)
 857        ]
 858        # Fields that are foreignkeys/m2ms depend on stuff.
 859        if isinstance(field, RelatedField) and field.remote_field.model:
 860            dependencies.extend(
 861                self._get_dependencies_for_foreign_key(
 862                    package_label,
 863                    model_name,
 864                    field,
 865                    self.to_state,
 866                )
 867            )
 868        # You can't just add NOT NULL fields with no default or fields
 869        # which don't allow empty strings as default.
 870        time_fields = (DateField, DateTimeField, TimeField)
 871        preserve_default = (
 872            field.allow_null
 873            or field.has_default()
 874            or isinstance(field, ManyToManyField)
 875            or (not field.required and field.empty_strings_allowed)
 876            or (isinstance(field, time_fields) and field.auto_now)
 877        )
 878        if not preserve_default:
 879            field = field.clone()
 880            if isinstance(field, time_fields) and field.auto_now_add:
 881                field.default = self.questioner.ask_auto_now_add_addition(
 882                    field_name, model_name
 883                )
 884            else:
 885                field.default = self.questioner.ask_not_null_addition(
 886                    field_name, model_name
 887                )
 888        if (
 889            field.primary_key
 890            and field.default is not NOT_PROVIDED
 891            and callable(field.default)
 892        ):
 893            self.questioner.ask_unique_callable_default_addition(field_name, model_name)
 894        self.add_operation(
 895            package_label,
 896            operations.AddField(
 897                model_name=model_name,
 898                name=field_name,
 899                field=field,
 900                preserve_default=preserve_default,
 901            ),
 902            dependencies=dependencies,
 903        )
 904
 905    def generate_removed_fields(self) -> None:
 906        """Make RemoveField operations."""
 907        for package_label, model_name, field_name in sorted(
 908            self.old_field_keys - self.new_field_keys
 909        ):
 910            self._generate_removed_field(package_label, model_name, field_name)
 911
 912    def _generate_removed_field(
 913        self, package_label: str, model_name: str, field_name: str
 914    ) -> None:
 915        self.add_operation(
 916            package_label,
 917            operations.RemoveField(
 918                model_name=model_name,
 919                name=field_name,
 920            ),
 921        )
 922
 923    def generate_altered_fields(self) -> None:
 924        """
 925        Make AlterField operations, or possibly RemovedField/AddField if alter
 926        isn't possible.
 927        """
 928        for package_label, model_name, field_name in sorted(
 929            self.old_field_keys & self.new_field_keys
 930        ):
 931            # Did the field change?
 932            old_model_name = self.renamed_models.get(
 933                (package_label, model_name), model_name
 934            )
 935            old_field_name = self.renamed_fields.get(
 936                (package_label, model_name, field_name), field_name
 937            )
 938            old_field = self.from_state.models[package_label, old_model_name].get_field(
 939                old_field_name
 940            )
 941            new_field = self.to_state.models[package_label, model_name].get_field(
 942                field_name
 943            )
 944            dependencies: list[tuple[str, str, str | None, bool | str]] = []
 945            # Implement any model renames on relations; these are handled by RenameModel
 946            # so we need to exclude them from the comparison
 947            if hasattr(new_field, "remote_field") and getattr(
 948                new_field.remote_field, "model", None
 949            ):
 950                rename_key = resolve_relation(
 951                    new_field.remote_field.model,  # type: ignore[unresolved-attribute]
 952                    package_label,
 953                    model_name,
 954                )
 955                if rename_key in self.renamed_models:
 956                    new_field.remote_field.model = old_field.remote_field.model  # type: ignore[unresolved-attribute]
 957                # Handle ForeignKeyField which can only have a single to_field.
 958                remote_field_name = getattr(new_field.remote_field, "field_name", None)
 959                if remote_field_name:
 960                    to_field_rename_key = rename_key + (remote_field_name,)
 961                    if to_field_rename_key in self.renamed_fields:
 962                        # Repoint model name only
 963                        new_field.remote_field.model = old_field.remote_field.model  # type: ignore[unresolved-attribute]
 964                dependencies.extend(
 965                    self._get_dependencies_for_foreign_key(
 966                        package_label,
 967                        model_name,
 968                        new_field,
 969                        self.to_state,
 970                    )
 971                )
 972            if hasattr(new_field, "remote_field") and getattr(
 973                new_field.remote_field, "through", None
 974            ):
 975                rename_key = resolve_relation(
 976                    new_field.remote_field.through,  # type: ignore[unresolved-attribute]
 977                    package_label,
 978                    model_name,
 979                )
 980                if rename_key in self.renamed_models:
 981                    new_field.remote_field.through = old_field.remote_field.through  # type: ignore[unresolved-attribute]
 982            old_field_dec = self.deep_deconstruct(old_field)
 983            new_field_dec = self.deep_deconstruct(new_field)
 984            if old_field_dec != new_field_dec and old_field_name == field_name:
 985                both_m2m = isinstance(old_field, ManyToManyField) and isinstance(
 986                    new_field, ManyToManyField
 987                )
 988                neither_m2m = not isinstance(
 989                    old_field, ManyToManyField
 990                ) and not isinstance(new_field, ManyToManyField)
 991                if both_m2m or neither_m2m:
 992                    # Either both fields are m2m or neither is
 993                    preserve_default = True
 994                    if (
 995                        old_field.allow_null
 996                        and not new_field.allow_null
 997                        and not new_field.has_default()
 998                        and not isinstance(new_field, ManyToManyField)
 999                    ):
1000                        field = new_field.clone()
1001                        new_default = self.questioner.ask_not_null_alteration(
1002                            field_name, model_name
1003                        )
1004                        if new_default is not NOT_PROVIDED:
1005                            field.default = new_default
1006                            preserve_default = False
1007                    else:
1008                        field = new_field
1009                    self.add_operation(
1010                        package_label,
1011                        operations.AlterField(
1012                            model_name=model_name,
1013                            name=field_name,
1014                            field=field,
1015                            preserve_default=preserve_default,
1016                        ),
1017                        dependencies=dependencies,
1018                    )
1019                else:
1020                    # We cannot alter between m2m and concrete fields
1021                    self._generate_removed_field(package_label, model_name, field_name)
1022                    self._generate_added_field(package_label, model_name, field_name)
1023
1024    def create_altered_indexes(self) -> None:
1025        option_name = operations.AddIndex.option_name
1026
1027        for package_label, model_name in sorted(self.kept_model_keys):
1028            old_model_name = self.renamed_models.get(
1029                (package_label, model_name), model_name
1030            )
1031            old_model_state = self.from_state.models[package_label, old_model_name]
1032            new_model_state = self.to_state.models[package_label, model_name]
1033
1034            old_indexes = old_model_state.options[option_name]
1035            new_indexes = new_model_state.options[option_name]
1036            added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1037            removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1038            renamed_indexes = []
1039            # Find renamed indexes.
1040            remove_from_added = []
1041            remove_from_removed = []
1042            for new_index in added_indexes:
1043                new_index_dec = new_index.deconstruct()
1044                new_index_name = new_index_dec[2].pop("name")
1045                for old_index in removed_indexes:
1046                    old_index_dec = old_index.deconstruct()
1047                    old_index_name = old_index_dec[2].pop("name")
1048                    # Indexes are the same except for the names.
1049                    if (
1050                        new_index_dec == old_index_dec
1051                        and new_index_name != old_index_name
1052                    ):
1053                        renamed_indexes.append((old_index_name, new_index_name, None))
1054                        remove_from_added.append(new_index)
1055                        remove_from_removed.append(old_index)
1056
1057            # Remove renamed indexes from the lists of added and removed
1058            # indexes.
1059            added_indexes = [
1060                idx for idx in added_indexes if idx not in remove_from_added
1061            ]
1062            removed_indexes = [
1063                idx for idx in removed_indexes if idx not in remove_from_removed
1064            ]
1065
1066            self.altered_indexes.update(
1067                {
1068                    (package_label, model_name): {
1069                        "added_indexes": added_indexes,
1070                        "removed_indexes": removed_indexes,
1071                        "renamed_indexes": renamed_indexes,
1072                    }
1073                }
1074            )
1075
1076    def generate_added_indexes(self) -> None:
1077        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1078            dependencies = self._get_dependencies_for_model(package_label, model_name)
1079            for index in alt_indexes["added_indexes"]:
1080                self.add_operation(
1081                    package_label,
1082                    operations.AddIndex(
1083                        model_name=model_name,
1084                        index=index,
1085                    ),
1086                    dependencies=dependencies,
1087                )
1088
1089    def generate_removed_indexes(self) -> None:
1090        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1091            for index in alt_indexes["removed_indexes"]:
1092                self.add_operation(
1093                    package_label,
1094                    operations.RemoveIndex(
1095                        model_name=model_name,
1096                        name=index.name,
1097                    ),
1098                )
1099
1100    def generate_renamed_indexes(self) -> None:
1101        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1102            for old_index_name, new_index_name, old_fields in alt_indexes[
1103                "renamed_indexes"
1104            ]:
1105                self.add_operation(
1106                    package_label,
1107                    operations.RenameIndex(
1108                        model_name=model_name,
1109                        new_name=new_index_name,
1110                        old_name=old_index_name,
1111                        old_fields=old_fields,
1112                    ),
1113                )
1114
1115    def create_altered_constraints(self) -> None:
1116        option_name = operations.AddConstraint.option_name
1117        for package_label, model_name in sorted(self.kept_model_keys):
1118            old_model_name = self.renamed_models.get(
1119                (package_label, model_name), model_name
1120            )
1121            old_model_state = self.from_state.models[package_label, old_model_name]
1122            new_model_state = self.to_state.models[package_label, model_name]
1123
1124            old_constraints = old_model_state.options[option_name]
1125            new_constraints = new_model_state.options[option_name]
1126            add_constraints = [c for c in new_constraints if c not in old_constraints]
1127            rem_constraints = [c for c in old_constraints if c not in new_constraints]
1128
1129            self.altered_constraints.update(
1130                {
1131                    (package_label, model_name): {
1132                        "added_constraints": add_constraints,
1133                        "removed_constraints": rem_constraints,
1134                    }
1135                }
1136            )
1137
1138    def generate_added_constraints(self) -> None:
1139        for (
1140            package_label,
1141            model_name,
1142        ), alt_constraints in self.altered_constraints.items():
1143            dependencies = self._get_dependencies_for_model(package_label, model_name)
1144            for constraint in alt_constraints["added_constraints"]:
1145                self.add_operation(
1146                    package_label,
1147                    operations.AddConstraint(
1148                        model_name=model_name,
1149                        constraint=constraint,
1150                    ),
1151                    dependencies=dependencies,
1152                )
1153
1154    def generate_removed_constraints(self) -> None:
1155        for (
1156            package_label,
1157            model_name,
1158        ), alt_constraints in self.altered_constraints.items():
1159            for constraint in alt_constraints["removed_constraints"]:
1160                self.add_operation(
1161                    package_label,
1162                    operations.RemoveConstraint(
1163                        model_name=model_name,
1164                        name=constraint.name,
1165                    ),
1166                )
1167
1168    @staticmethod
1169    def _get_dependencies_for_foreign_key(
1170        package_label: str, model_name: str, field: Field, project_state: ProjectState
1171    ) -> list[tuple[str, str, str | None, bool | str]]:
1172        remote_field_model = None
1173        if isinstance(field, RelatedField):
1174            remote_field_model = field.remote_field.model
1175        else:
1176            relations = project_state.relations[package_label, model_name]
1177            for (remote_package_label, remote_model_name), fields in relations.items():
1178                if any(
1179                    field == related_field.remote_field
1180                    for related_field in fields.values()
1181                    if isinstance(related_field, RelatedField)
1182                ):
1183                    remote_field_model = f"{remote_package_label}.{remote_model_name}"
1184                    break
1185        dep_package_label, dep_object_name = resolve_relation(
1186            remote_field_model,
1187            package_label,
1188            model_name,
1189        )
1190        dependencies: list[tuple[str, str, str | None, bool | str]] = [
1191            (dep_package_label, dep_object_name, None, True)
1192        ]
1193        if isinstance(field, RelatedField) and isinstance(
1194            field.remote_field, ManyToManyRel
1195        ):
1196            through_package_label, through_object_name = resolve_relation(
1197                field.remote_field.through,
1198                package_label,
1199                model_name,
1200            )
1201            dependencies.append(
1202                (through_package_label, through_object_name, None, True)
1203            )
1204        return dependencies
1205
1206    def _get_dependencies_for_model(
1207        self, package_label: str, model_name: str
1208    ) -> list[tuple[str, str, str | None, bool | str]]:
1209        """Return foreign key dependencies of the given model."""
1210        dependencies = []
1211        model_state = self.to_state.models[package_label, model_name]
1212        for field in model_state.fields.values():
1213            if isinstance(field, RelatedField):
1214                dependencies.extend(
1215                    self._get_dependencies_for_foreign_key(
1216                        package_label,
1217                        model_name,
1218                        field,
1219                        self.to_state,
1220                    )
1221                )
1222        return dependencies
1223
1224    def generate_altered_db_table(self) -> None:
1225        for package_label, model_name in sorted(self.kept_model_keys):
1226            old_model_name = self.renamed_models.get(
1227                (package_label, model_name), model_name
1228            )
1229            old_model_state = self.from_state.models[package_label, old_model_name]
1230            new_model_state = self.to_state.models[package_label, model_name]
1231            old_db_table_name = old_model_state.options.get("db_table")
1232            new_db_table_name = new_model_state.options.get("db_table")
1233            if old_db_table_name != new_db_table_name:
1234                self.add_operation(
1235                    package_label,
1236                    operations.AlterModelTable(
1237                        name=model_name,
1238                        table=new_db_table_name,
1239                    ),
1240                )
1241
1242    def generate_altered_options(self) -> None:
1243        """
1244        Work out if any non-schema-affecting options have changed and make an
1245        operation to represent them in state changes (in case Python code in
1246        migrations needs them).
1247        """
1248        for package_label, model_name in sorted(self.kept_model_keys):
1249            old_model_name = self.renamed_models.get(
1250                (package_label, model_name), model_name
1251            )
1252            old_model_state = self.from_state.models[package_label, old_model_name]
1253            new_model_state = self.to_state.models[package_label, model_name]
1254            old_options = {
1255                key: value
1256                for key, value in old_model_state.options.items()
1257                if key in AlterModelOptions.ALTER_OPTION_KEYS
1258            }
1259            new_options = {
1260                key: value
1261                for key, value in new_model_state.options.items()
1262                if key in AlterModelOptions.ALTER_OPTION_KEYS
1263            }
1264            if old_options != new_options:
1265                self.add_operation(
1266                    package_label,
1267                    operations.AlterModelOptions(
1268                        name=model_name,
1269                        options=new_options,
1270                    ),
1271                )
1272
1273    def arrange_for_graph(
1274        self,
1275        changes: dict[str, list[Migration]],
1276        graph: MigrationGraph,
1277        migration_name: str | None = None,
1278    ) -> dict[str, list[Migration]]:
1279        """
1280        Take a result from changes() and a MigrationGraph, and fix the names
1281        and dependencies of the changes so they extend the graph from the leaf
1282        nodes for each app.
1283        """
1284        leaves = graph.leaf_nodes()
1285        name_map = {}
1286        for package_label, migrations in list(changes.items()):
1287            if not migrations:
1288                continue
1289            # Find the app label's current leaf node
1290            app_leaf = None
1291            for leaf in leaves:
1292                if leaf[0] == package_label:
1293                    app_leaf = leaf
1294                    break
1295            # Do they want an initial migration for this app?
1296            if app_leaf is None and not self.questioner.ask_initial(package_label):
1297                # They don't.
1298                for migration in migrations:
1299                    name_map[(package_label, migration.name)] = (
1300                        package_label,
1301                        "__first__",
1302                    )
1303                del changes[package_label]
1304                continue
1305            # Work out the next number in the sequence
1306            if app_leaf is None:
1307                next_number = 1
1308            else:
1309                next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1310            # Name each migration
1311            for i, migration in enumerate(migrations):
1312                if i == 0 and app_leaf:
1313                    migration.dependencies.append(app_leaf)
1314                new_name_parts = ["%04i" % next_number]  # noqa: UP031
1315                if migration_name:
1316                    new_name_parts.append(migration_name)
1317                elif i == 0 and not app_leaf:
1318                    new_name_parts.append("initial")
1319                else:
1320                    new_name_parts.append(migration.suggest_name()[:100])
1321                new_name = "_".join(new_name_parts)
1322                name_map[(package_label, migration.name)] = (package_label, new_name)
1323                next_number += 1
1324                migration.name = new_name
1325        # Now fix dependencies
1326        for migrations in changes.values():
1327            for migration in migrations:
1328                migration.dependencies = [
1329                    name_map.get(d, d) for d in migration.dependencies
1330                ]
1331        return changes
1332
1333    def _trim_to_packages(
1334        self, changes: dict[str, list[Migration]], package_labels: set[str]
1335    ) -> dict[str, list[Migration]]:
1336        """
1337        Take changes from arrange_for_graph() and set of app labels, and return
1338        a modified set of changes which trims out as many migrations that are
1339        not in package_labels as possible. Note that some other migrations may
1340        still be present as they may be required dependencies.
1341        """
1342        # Gather other app dependencies in a first pass
1343        app_dependencies = {}
1344        for package_label, migrations in changes.items():
1345            for migration in migrations:
1346                for dep_package_label, name in migration.dependencies:
1347                    app_dependencies.setdefault(package_label, set()).add(
1348                        dep_package_label
1349                    )
1350        required_packages = set(package_labels)
1351        # Keep resolving till there's no change
1352        old_required_packages = None
1353        while old_required_packages != required_packages:
1354            old_required_packages = set(required_packages)
1355            required_packages.update(
1356                *[
1357                    app_dependencies.get(package_label, ())
1358                    for package_label in required_packages
1359                ]
1360            )
1361        # Remove all migrations that aren't needed
1362        for package_label in list(changes):
1363            if package_label not in required_packages:
1364                del changes[package_label]
1365        return changes
1366
1367    @classmethod
1368    def parse_number(cls, name: str) -> int | None:
1369        """
1370        Given a migration name, try to extract a number from the beginning of
1371        it. For a squashed migration such as '0001_squashed_0004…', return the
1372        second number. If no number is found, return None.
1373        """
1374        if squashed_match := re.search(r".*_squashed_(\d+)", name):
1375            return int(squashed_match[1])
1376        match = re.match(r"^\d+", name)
1377        if match:
1378            return int(match[0])
1379        return None