1from __future__ import annotations
   2
   3import functools
   4import re
   5from graphlib import TopologicalSorter
   6from typing import TYPE_CHECKING, Any
   7
   8from plain.models.fields import (
   9    NOT_PROVIDED,
  10    DateField,
  11    DateTimeField,
  12    Field,
  13    TimeField,
  14)
  15from plain.models.fields.related import ManyToManyField, RelatedField
  16from plain.models.fields.reverse_related import ManyToManyRel
  17from plain.models.migrations import operations
  18from plain.models.migrations.migration import Migration, SettingsTuple
  19from plain.models.migrations.operations.models import AlterModelOptions
  20from plain.models.migrations.optimizer import MigrationOptimizer
  21from plain.models.migrations.questioner import MigrationQuestioner
  22from plain.models.migrations.utils import (
  23    COMPILED_REGEX_TYPE,
  24    RegexObject,
  25    resolve_relation,
  26)
  27from plain.runtime import settings
  28
  29if TYPE_CHECKING:
  30    from plain.models.migrations.graph import MigrationGraph
  31    from plain.models.migrations.operations.base import Operation
  32    from plain.models.migrations.state import ProjectState
  33
  34
  35class MigrationAutodetector:
  36    """
  37    Take a pair of ProjectStates and compare them to see what the first would
  38    need doing to make it match the second (the second usually being the
  39    project's current state).
  40
  41    Note that this naturally operates on entire projects at a time,
  42    as it's likely that changes interact (for example, you can't
  43    add a ForeignKeyField without having a migration to add the table it
  44    depends on first). A user interface may offer single-app usage
  45    if it wishes, with the caveat that it may not always be possible.
  46    """
  47
  48    def __init__(
  49        self,
  50        from_state: ProjectState,
  51        to_state: ProjectState,
  52        questioner: MigrationQuestioner | None = None,
  53    ):
  54        self.from_state = from_state
  55        self.to_state = to_state
  56        self.questioner = questioner or MigrationQuestioner()
  57        self.existing_packages = {app for app, model in from_state.models}
  58
  59    def changes(
  60        self,
  61        graph: MigrationGraph,
  62        trim_to_packages: set[str] | None = None,
  63        convert_packages: set[str] | None = None,
  64        migration_name: str | None = None,
  65    ) -> dict[str, list[Migration]]:
  66        """
  67        Main entry point to produce a list of applicable changes.
  68        Take a graph to base names on and an optional set of packages
  69        to try and restrict to (restriction is not guaranteed)
  70        """
  71        changes = self._detect_changes(convert_packages, graph)
  72        changes = self.arrange_for_graph(changes, graph, migration_name)
  73        if trim_to_packages:
  74            changes = self._trim_to_packages(changes, trim_to_packages)
  75        return changes
  76
  77    def deep_deconstruct(self, obj: Any) -> Any:
  78        """
  79        Recursive deconstruction for a field and its arguments.
  80        Used for full comparison for rename/alter; sometimes a single-level
  81        deconstruction will not compare correctly.
  82        """
  83        if isinstance(obj, list):
  84            return [self.deep_deconstruct(value) for value in obj]
  85        elif isinstance(obj, tuple):
  86            return tuple(self.deep_deconstruct(value) for value in obj)
  87        elif isinstance(obj, dict):
  88            return {key: self.deep_deconstruct(value) for key, value in obj.items()}
  89        elif isinstance(obj, functools.partial):
  90            return (
  91                obj.func,
  92                self.deep_deconstruct(obj.args),
  93                self.deep_deconstruct(obj.keywords),
  94            )
  95        elif isinstance(obj, COMPILED_REGEX_TYPE):
  96            return RegexObject(obj)
  97        elif isinstance(obj, type):
  98            # If this is a type that implements 'deconstruct' as an instance method,
  99            # avoid treating this as being deconstructible itself - see #22951
 100            return obj
 101        elif hasattr(obj, "deconstruct"):
 102            deconstructed = obj.deconstruct()
 103            if isinstance(obj, Field):
 104                # we have a field which also returns a name
 105                deconstructed = deconstructed[1:]
 106            path, args, kwargs = deconstructed
 107            return (
 108                path,
 109                [self.deep_deconstruct(value) for value in args],
 110                {key: self.deep_deconstruct(value) for key, value in kwargs.items()},
 111            )
 112        else:
 113            return obj
 114
 115    def only_relation_agnostic_fields(self, fields: dict[str, Field]) -> list[Any]:
 116        """
 117        Return a definition of the fields that ignores field names and
 118        what related fields actually relate to. Used for detecting renames (as
 119        the related fields change during renames).
 120        """
 121        fields_def = []
 122        for name, field in sorted(fields.items()):
 123            deconstruction = self.deep_deconstruct(field)
 124            if isinstance(field, RelatedField) and field.remote_field.model:
 125                deconstruction[2].pop("to", None)
 126            fields_def.append(deconstruction)
 127        return fields_def
 128
 129    def _detect_changes(
 130        self,
 131        convert_packages: set[str] | None = None,
 132        graph: MigrationGraph | None = None,
 133    ) -> dict[str, list[Migration]]:
 134        """
 135        Return a dict of migration plans which will achieve the
 136        change from from_state to to_state. The dict has app labels
 137        as keys and a list of migrations as values.
 138
 139        The resulting migrations aren't specially named, but the names
 140        do matter for dependencies inside the set.
 141
 142        convert_packages is the list of packages to convert to use migrations
 143        (i.e. to make initial migrations for, in the usual case)
 144
 145        graph is an optional argument that, if provided, can help improve
 146        dependency generation and avoid potential circular dependencies.
 147        """
 148        # The first phase is generating all the operations for each app
 149        # and gathering them into a big per-app list.
 150        # Then go through that list, order it, and split into migrations to
 151        # resolve dependencies caused by M2Ms and FKs.
 152        self.generated_operations = {}
 153        self.altered_indexes = {}
 154        self.altered_constraints = {}
 155        self.renamed_fields = {}
 156
 157        # Prepare some old/new state and model lists, ignoring unmigrated packages.
 158        self.old_model_keys = set()
 159        self.new_model_keys = set()
 160        for (package_label, model_name), model_state in self.from_state.models.items():
 161            if package_label not in self.from_state.real_packages:
 162                self.old_model_keys.add((package_label, model_name))
 163
 164        for (package_label, model_name), model_state in self.to_state.models.items():
 165            if package_label not in self.from_state.real_packages or (
 166                convert_packages and package_label in convert_packages
 167            ):
 168                self.new_model_keys.add((package_label, model_name))
 169
 170        self.from_state.resolve_fields_and_relations()
 171        self.to_state.resolve_fields_and_relations()
 172
 173        # Renames have to come first
 174        self.generate_renamed_models()
 175
 176        # Prepare lists of fields and generate through model map
 177        self._prepare_field_lists()
 178        self._generate_through_model_map()
 179
 180        # Generate non-rename model operations
 181        self.generate_deleted_models()
 182        self.generate_created_models()
 183        self.generate_altered_options()
 184
 185        # Create the renamed fields and store them in self.renamed_fields.
 186        # They are used by create_altered_indexes(), generate_altered_fields(),
 187        # generate_removed_altered_index(), and
 188        # generate_altered_index().
 189        self.create_renamed_fields()
 190        # Create the altered indexes and store them in self.altered_indexes.
 191        # This avoids the same computation in generate_removed_indexes()
 192        # and generate_added_indexes().
 193        self.create_altered_indexes()
 194        self.create_altered_constraints()
 195        # Generate index removal operations before field is removed
 196        self.generate_removed_constraints()
 197        self.generate_removed_indexes()
 198        # Generate field renaming operations.
 199        self.generate_renamed_fields()
 200        self.generate_renamed_indexes()
 201        # Generate field operations.
 202        self.generate_removed_fields()
 203        self.generate_added_fields()
 204        self.generate_altered_fields()
 205        self.generate_added_indexes()
 206        self.generate_added_constraints()
 207        self.generate_altered_db_table()
 208
 209        self._sort_migrations()
 210        self._build_migration_list(graph)
 211        self._optimize_migrations()
 212
 213        return self.migrations
 214
 215    def _prepare_field_lists(self) -> None:
 216        """
 217        Prepare field lists and a list of the fields that used through models
 218        in the old state so dependencies can be made from the through model
 219        deletion to the field that uses it.
 220        """
 221        self.kept_model_keys = self.old_model_keys & self.new_model_keys
 222        self.through_users = {}
 223        self.old_field_keys = {
 224            (package_label, model_name, field_name)
 225            for package_label, model_name in self.kept_model_keys
 226            for field_name in self.from_state.models[
 227                package_label,
 228                self.renamed_models.get((package_label, model_name), model_name),
 229            ].fields
 230        }
 231        self.new_field_keys = {
 232            (package_label, model_name, field_name)
 233            for package_label, model_name in self.kept_model_keys
 234            for field_name in self.to_state.models[package_label, model_name].fields
 235        }
 236
 237    def _generate_through_model_map(self) -> None:
 238        """Through model map generation."""
 239        for package_label, model_name in sorted(self.old_model_keys):
 240            old_model_name = self.renamed_models.get(
 241                (package_label, model_name), model_name
 242            )
 243            old_model_state = self.from_state.models[package_label, old_model_name]
 244            for field_name, field in old_model_state.fields.items():
 245                if hasattr(field, "remote_field") and getattr(
 246                    field.remote_field, "through", None
 247                ):
 248                    through_key = resolve_relation(
 249                        field.remote_field.through, package_label, model_name
 250                    )
 251                    self.through_users[through_key] = (
 252                        package_label,
 253                        old_model_name,
 254                        field_name,
 255                    )
 256
 257    @staticmethod
 258    def _resolve_dependency(
 259        dependency: tuple[str, str, str | None, bool | str],
 260    ) -> tuple[tuple[str, str, str | None, bool | str], bool]:
 261        """
 262        Return the resolved dependency and a boolean denoting whether or not
 263        it was a settings dependency.
 264        """
 265        if not isinstance(dependency, SettingsTuple):
 266            return dependency, False
 267        resolved_package_label, resolved_object_name = getattr(
 268            settings, dependency[1]
 269        ).split(".")
 270        return (resolved_package_label, resolved_object_name.lower()) + dependency[
 271            2:
 272        ], True
 273
 274    def _build_migration_list(self, graph: MigrationGraph | None = None) -> None:
 275        """
 276        Chop the lists of operations up into migrations with dependencies on
 277        each other. Do this by going through an app's list of operations until
 278        one is found that has an outgoing dependency that isn't in another
 279        app's migration yet (hasn't been chopped off its list). Then chop off
 280        the operations before it into a migration and move onto the next app.
 281        If the loops completes without doing anything, there's a circular
 282        dependency (which _should_ be impossible as the operations are
 283        all split at this point so they can't depend and be depended on).
 284        """
 285        self.migrations = {}
 286        num_ops = sum(len(x) for x in self.generated_operations.values())
 287        chop_mode = False
 288        while num_ops:
 289            # On every iteration, we step through all the packages and see if there
 290            # is a completed set of operations.
 291            # If we find that a subset of the operations are complete we can
 292            # try to chop it off from the rest and continue, but we only
 293            # do this if we've already been through the list once before
 294            # without any chopping and nothing has changed.
 295            for package_label in sorted(self.generated_operations):
 296                chopped = []
 297                dependencies = set()
 298                for operation in list(self.generated_operations[package_label]):
 299                    deps_satisfied = True
 300                    operation_dependencies = set()
 301                    for dep in operation._auto_deps:
 302                        # Temporarily resolve the settings dependency to
 303                        # prevent circular references. While keeping the
 304                        # dependency checks on the resolved model, add the
 305                        # settings dependencies.
 306                        original_dep = dep
 307                        dep, is_settings_dep = self._resolve_dependency(dep)
 308                        if dep[0] != package_label:
 309                            # External app dependency. See if it's not yet
 310                            # satisfied.
 311                            for other_operation in self.generated_operations.get(
 312                                dep[0], []
 313                            ):
 314                                if self.check_dependency(other_operation, dep):
 315                                    deps_satisfied = False
 316                                    break
 317                            if not deps_satisfied:
 318                                break
 319                            else:
 320                                if is_settings_dep:
 321                                    operation_dependencies.add(
 322                                        (original_dep[0], original_dep[1])
 323                                    )
 324                                elif dep[0] in self.migrations:
 325                                    operation_dependencies.add(
 326                                        (dep[0], self.migrations[dep[0]][-1].name)
 327                                    )
 328                                else:
 329                                    # If we can't find the other app, we add a
 330                                    # first/last dependency, but only if we've
 331                                    # already been through once and checked
 332                                    # everything.
 333                                    if chop_mode:
 334                                        # If the app already exists, we add a
 335                                        # dependency on the last migration, as
 336                                        # we don't know which migration
 337                                        # contains the target field. If it's
 338                                        # not yet migrated or has no
 339                                        # migrations, we use __first__.
 340                                        if graph and graph.leaf_nodes(dep[0]):
 341                                            operation_dependencies.add(
 342                                                graph.leaf_nodes(dep[0])[0]
 343                                            )
 344                                        else:
 345                                            operation_dependencies.add(
 346                                                (dep[0], "__first__")
 347                                            )
 348                                    else:
 349                                        deps_satisfied = False
 350                    if deps_satisfied:
 351                        chopped.append(operation)
 352                        dependencies.update(operation_dependencies)
 353                        del self.generated_operations[package_label][0]
 354                    else:
 355                        break
 356                # Make a migration! Well, only if there's stuff to put in it
 357                if dependencies or chopped:
 358                    if not self.generated_operations[package_label] or chop_mode:
 359                        subclass = type(
 360                            "Migration",
 361                            (Migration,),
 362                            {"operations": [], "dependencies": []},
 363                        )
 364                        instance = subclass(
 365                            "auto_%i"  # noqa: UP031
 366                            % (len(self.migrations.get(package_label, [])) + 1),
 367                            package_label,
 368                        )
 369                        instance.dependencies = list(dependencies)
 370                        instance.operations = chopped
 371                        instance.initial = package_label not in self.existing_packages
 372                        self.migrations.setdefault(package_label, []).append(instance)
 373                        chop_mode = False
 374                    else:
 375                        self.generated_operations[package_label] = (
 376                            chopped + self.generated_operations[package_label]
 377                        )
 378            new_num_ops = sum(len(x) for x in self.generated_operations.values())
 379            if new_num_ops == num_ops:
 380                if not chop_mode:
 381                    chop_mode = True
 382                else:
 383                    raise ValueError(
 384                        f"Cannot resolve operation dependencies: {self.generated_operations!r}"
 385                    )
 386            num_ops = new_num_ops
 387
 388    def _sort_migrations(self) -> None:
 389        """
 390        Reorder to make things possible. Reordering may be needed so FKs work
 391        nicely inside the same app.
 392        """
 393        for package_label, ops in sorted(self.generated_operations.items()):
 394            ts = TopologicalSorter()
 395            for op in ops:
 396                ts.add(op)
 397                for dep in op._auto_deps:
 398                    # Resolve intra-app dependencies to handle circular
 399                    # references involving a settings model.
 400                    dep = self._resolve_dependency(dep)[0]
 401                    if dep[0] != package_label:
 402                        continue
 403                    ts.add(op, *(x for x in ops if self.check_dependency(x, dep)))
 404            self.generated_operations[package_label] = list(ts.static_order())
 405
 406    def _optimize_migrations(self) -> None:
 407        # Add in internal dependencies among the migrations
 408        for package_label, migrations in self.migrations.items():
 409            for m1, m2 in zip(migrations, migrations[1:]):
 410                m2.dependencies.append((package_label, m1.name))
 411
 412        # De-dupe dependencies
 413        for migrations in self.migrations.values():
 414            for migration in migrations:
 415                migration.dependencies = list(set(migration.dependencies))
 416
 417        # Optimize migrations
 418        for package_label, migrations in self.migrations.items():
 419            for migration in migrations:
 420                migration.operations = MigrationOptimizer().optimize(
 421                    migration.operations, package_label
 422                )
 423
 424    def check_dependency(
 425        self, operation: Operation, dependency: tuple[str, str, str | None, bool | str]
 426    ) -> bool:
 427        """
 428        Return True if the given operation depends on the given dependency,
 429        False otherwise.
 430        """
 431        # Created model
 432        if dependency[2] is None and dependency[3] is True:
 433            return (
 434                isinstance(operation, operations.CreateModel)
 435                and operation.name_lower == dependency[1].lower()
 436            )
 437        # Created field
 438        elif dependency[2] is not None and dependency[3] is True:
 439            return (
 440                isinstance(operation, operations.CreateModel)
 441                and operation.name_lower == dependency[1].lower()
 442                and any(dependency[2] == x for x, y in operation.fields)
 443            ) or (
 444                isinstance(operation, operations.AddField)
 445                and operation.model_name_lower == dependency[1].lower()
 446                and operation.name_lower == dependency[2].lower()
 447            )
 448        # Removed field
 449        elif dependency[2] is not None and dependency[3] is False:
 450            return (
 451                isinstance(operation, operations.RemoveField)
 452                and operation.model_name_lower == dependency[1].lower()
 453                and operation.name_lower == dependency[2].lower()
 454            )
 455        # Removed model
 456        elif dependency[2] is None and dependency[3] is False:
 457            return (
 458                isinstance(operation, operations.DeleteModel)
 459                and operation.name_lower == dependency[1].lower()
 460            )
 461        # Field being altered
 462        elif dependency[2] is not None and dependency[3] == "alter":
 463            return (
 464                isinstance(operation, operations.AlterField)
 465                and operation.model_name_lower == dependency[1].lower()
 466                and operation.name_lower == dependency[2].lower()
 467            )
 468        # Unknown dependency. Raise an error.
 469        else:
 470            raise ValueError(f"Can't handle dependency {dependency!r}")
 471
 472    def add_operation(
 473        self,
 474        package_label: str,
 475        operation: Operation,
 476        dependencies: list[tuple[str, str, str | None, bool | str]] | None = None,
 477        beginning: bool = False,
 478    ) -> None:
 479        # Operation dependencies are 4-element tuples:
 480        # (package_label, model_name, field_name, create/delete as True/False or "alter")
 481        operation._auto_deps = dependencies or []
 482        if beginning:
 483            self.generated_operations.setdefault(package_label, []).insert(0, operation)
 484        else:
 485            self.generated_operations.setdefault(package_label, []).append(operation)
 486
 487    def generate_renamed_models(self) -> None:
 488        """
 489        Find any renamed models, generate the operations for them, and remove
 490        the old entry from the model lists. Must be run before other
 491        model-level generation.
 492        """
 493        self.renamed_models = {}
 494        self.renamed_models_rel = {}
 495        added_models = self.new_model_keys - self.old_model_keys
 496        for package_label, model_name in sorted(added_models):
 497            model_state = self.to_state.models[package_label, model_name]
 498            model_fields_def = self.only_relation_agnostic_fields(model_state.fields)
 499
 500            removed_models = self.old_model_keys - self.new_model_keys
 501            for rem_package_label, rem_model_name in removed_models:
 502                if rem_package_label == package_label:
 503                    rem_model_state = self.from_state.models[
 504                        rem_package_label, rem_model_name
 505                    ]
 506                    rem_model_fields_def = self.only_relation_agnostic_fields(
 507                        rem_model_state.fields
 508                    )
 509                    if model_fields_def == rem_model_fields_def:
 510                        if self.questioner.ask_rename_model(
 511                            rem_model_state, model_state
 512                        ):
 513                            dependencies = []
 514                            fields = list(model_state.fields.values()) + [
 515                                field.remote_field
 516                                for relations in self.to_state.relations[
 517                                    package_label, model_name
 518                                ].values()
 519                                for field in relations.values()
 520                                if isinstance(field, RelatedField)
 521                            ]
 522                            for field in fields:
 523                                if isinstance(field, RelatedField):
 524                                    dependencies.extend(
 525                                        self._get_dependencies_for_foreign_key(
 526                                            package_label,
 527                                            model_name,
 528                                            field,
 529                                            self.to_state,
 530                                        )
 531                                    )
 532                            self.add_operation(
 533                                package_label,
 534                                operations.RenameModel(
 535                                    old_name=rem_model_state.name,
 536                                    new_name=model_state.name,
 537                                ),
 538                                dependencies=dependencies,
 539                            )
 540                            self.renamed_models[package_label, model_name] = (
 541                                rem_model_name
 542                            )
 543                            renamed_models_rel_key = f"{rem_model_state.package_label}.{rem_model_state.name_lower}"
 544                            self.renamed_models_rel[renamed_models_rel_key] = (
 545                                f"{model_state.package_label}.{model_state.name_lower}"
 546                            )
 547                            self.old_model_keys.remove(
 548                                (rem_package_label, rem_model_name)
 549                            )
 550                            self.old_model_keys.add((package_label, model_name))
 551                            break
 552
 553    def generate_created_models(self) -> None:
 554        """
 555        Find all new models and make create
 556        operations for them as well as separate operations to create any
 557        foreign key or M2M relationships (these are optimized later, if
 558        possible).
 559
 560        Defer any model options that refer to collections of fields that might
 561        be deferred.
 562        """
 563        added_models = self.new_model_keys - self.old_model_keys
 564
 565        for package_label, model_name in added_models:
 566            model_state = self.to_state.models[package_label, model_name]
 567            # Gather related fields
 568            related_fields = {}
 569            primary_key_rel = None
 570            for field_name, field in model_state.fields.items():
 571                if isinstance(field, RelatedField):
 572                    if field.remote_field.model:
 573                        if field.primary_key:
 574                            primary_key_rel = field.remote_field.model
 575                        else:
 576                            related_fields[field_name] = field
 577                    if isinstance(field.remote_field, ManyToManyRel):
 578                        related_fields[field_name] = field
 579
 580            # Are there indexes to defer?
 581            indexes = model_state.options.pop("indexes")
 582            constraints = model_state.options.pop("constraints")
 583            # Depend on the deletion of any possible proxy version of us
 584            dependencies = [
 585                (package_label, model_name, None, False),
 586            ]
 587            # Depend on all bases
 588            for base in model_state.bases:
 589                if isinstance(base, str) and "." in base:
 590                    base_package_label, base_name = base.split(".", 1)
 591                    dependencies.append((base_package_label, base_name, None, True))
 592                    # Depend on the removal of base fields if the new model has
 593                    # a field with the same name.
 594                    old_base_model_state = self.from_state.models.get(
 595                        (base_package_label, base_name)
 596                    )
 597                    new_base_model_state = self.to_state.models.get(
 598                        (base_package_label, base_name)
 599                    )
 600                    if old_base_model_state and new_base_model_state:
 601                        removed_base_fields = (
 602                            set(old_base_model_state.fields)
 603                            .difference(
 604                                new_base_model_state.fields,
 605                            )
 606                            .intersection(model_state.fields)
 607                        )
 608                        for removed_base_field in removed_base_fields:
 609                            dependencies.append(
 610                                (
 611                                    base_package_label,
 612                                    base_name,
 613                                    removed_base_field,
 614                                    False,
 615                                )
 616                            )
 617            # Depend on the other end of the primary key if it's a relation
 618            if primary_key_rel:
 619                dependencies.append(
 620                    resolve_relation(
 621                        primary_key_rel,
 622                        package_label,
 623                        model_name,
 624                    )
 625                    + (None, True)
 626                )
 627            # Generate creation operation
 628            self.add_operation(
 629                package_label,
 630                operations.CreateModel(
 631                    name=model_state.name,
 632                    fields=[
 633                        d
 634                        for d in model_state.fields.items()
 635                        if d[0] not in related_fields
 636                    ],
 637                    options=model_state.options,
 638                    bases=model_state.bases,
 639                ),
 640                dependencies=dependencies,
 641                beginning=True,
 642            )
 643
 644            # Generate operations for each related field
 645            for name, field in sorted(related_fields.items()):
 646                dependencies = self._get_dependencies_for_foreign_key(
 647                    package_label,
 648                    model_name,
 649                    field,
 650                    self.to_state,
 651                )
 652                # Depend on our own model being created
 653                dependencies.append((package_label, model_name, None, True))
 654                # Make operation
 655                self.add_operation(
 656                    package_label,
 657                    operations.AddField(
 658                        model_name=model_name,
 659                        name=name,
 660                        field=field,
 661                    ),
 662                    dependencies=list(set(dependencies)),
 663                )
 664
 665            related_dependencies = [
 666                (package_label, model_name, name, True)
 667                for name in sorted(related_fields)
 668            ]
 669            related_dependencies.append((package_label, model_name, None, True))
 670            for index in indexes:
 671                self.add_operation(
 672                    package_label,
 673                    operations.AddIndex(
 674                        model_name=model_name,
 675                        index=index,
 676                    ),
 677                    dependencies=related_dependencies,
 678                )
 679            for constraint in constraints:
 680                self.add_operation(
 681                    package_label,
 682                    operations.AddConstraint(
 683                        model_name=model_name,
 684                        constraint=constraint,
 685                    ),
 686                    dependencies=related_dependencies,
 687                )
 688
 689    def generate_deleted_models(self) -> None:
 690        """
 691        Find all deleted models and make delete
 692        operations for them as well as separate operations to delete any
 693        foreign key or M2M relationships (these are optimized later, if
 694        possible).
 695
 696        Also bring forward removal of any model options that refer to
 697        collections of fields - the inverse of generate_created_models().
 698        """
 699        deleted_models = self.old_model_keys - self.new_model_keys
 700
 701        for package_label, model_name in sorted(deleted_models):
 702            model_state = self.from_state.models[package_label, model_name]
 703            # Gather related fields
 704            related_fields = {}
 705            for field_name, field in model_state.fields.items():
 706                if isinstance(field, RelatedField):
 707                    if field.remote_field.model:
 708                        related_fields[field_name] = field
 709                    if isinstance(field.remote_field, ManyToManyRel):
 710                        related_fields[field_name] = field
 711
 712            # Then remove each related field
 713            for name in sorted(related_fields):
 714                self.add_operation(
 715                    package_label,
 716                    operations.RemoveField(
 717                        model_name=model_name,
 718                        name=name,
 719                    ),
 720                )
 721            # Finally, remove the model.
 722            # This depends on both the removal/alteration of all incoming fields
 723            # and the removal of all its own related fields, and if it's
 724            # a through model the field that references it.
 725            dependencies = []
 726            relations = self.from_state.relations
 727            for (
 728                related_object_package_label,
 729                object_name,
 730            ), relation_related_fields in relations[package_label, model_name].items():
 731                for field_name, field in relation_related_fields.items():
 732                    dependencies.append(
 733                        (related_object_package_label, object_name, field_name, False),
 734                    )
 735                    if not isinstance(field, ManyToManyField):
 736                        dependencies.append(
 737                            (
 738                                related_object_package_label,
 739                                object_name,
 740                                field_name,
 741                                "alter",
 742                            ),
 743                        )
 744
 745            for name in sorted(related_fields):
 746                dependencies.append((package_label, model_name, name, False))
 747            # We're referenced in another field's through=
 748            through_user = self.through_users.get(
 749                (package_label, model_state.name_lower)
 750            )
 751            if through_user:
 752                dependencies.append(
 753                    (through_user[0], through_user[1], through_user[2], False)
 754                )
 755            # Finally, make the operation, deduping any dependencies
 756            self.add_operation(
 757                package_label,
 758                operations.DeleteModel(
 759                    name=model_state.name,
 760                ),
 761                dependencies=list(set(dependencies)),
 762            )
 763
 764    def create_renamed_fields(self) -> None:
 765        """Work out renamed fields."""
 766        self.renamed_operations = []
 767        old_field_keys = self.old_field_keys.copy()
 768        for package_label, model_name, field_name in sorted(
 769            self.new_field_keys - old_field_keys
 770        ):
 771            old_model_name = self.renamed_models.get(
 772                (package_label, model_name), model_name
 773            )
 774            old_model_state = self.from_state.models[package_label, old_model_name]
 775            new_model_state = self.to_state.models[package_label, model_name]
 776            field = new_model_state.get_field(field_name)
 777            # Scan to see if this is actually a rename!
 778            field_dec = self.deep_deconstruct(field)
 779            for rem_package_label, rem_model_name, rem_field_name in sorted(
 780                old_field_keys - self.new_field_keys
 781            ):
 782                if rem_package_label == package_label and rem_model_name == model_name:
 783                    old_field = old_model_state.get_field(rem_field_name)
 784                    old_field_dec = self.deep_deconstruct(old_field)
 785                    if (
 786                        isinstance(field, RelatedField)
 787                        and field.remote_field
 788                        and field.remote_field.model
 789                        and "to" in old_field_dec[2]
 790                    ):
 791                        old_rel_to = old_field_dec[2]["to"]
 792                        if old_rel_to in self.renamed_models_rel:
 793                            old_field_dec[2]["to"] = self.renamed_models_rel[old_rel_to]
 794                    old_field.set_attributes_from_name(rem_field_name)
 795                    if old_field_dec == field_dec:
 796                        if self.questioner.ask_rename(
 797                            model_name, rem_field_name, field_name, field
 798                        ):
 799                            self.renamed_operations.append(
 800                                (
 801                                    rem_package_label,
 802                                    rem_model_name,
 803                                    rem_field_name,
 804                                    package_label,
 805                                    model_name,
 806                                    field,
 807                                    field_name,
 808                                )
 809                            )
 810                            old_field_keys.remove(
 811                                (rem_package_label, rem_model_name, rem_field_name)
 812                            )
 813                            old_field_keys.add((package_label, model_name, field_name))
 814                            self.renamed_fields[
 815                                package_label, model_name, field_name
 816                            ] = rem_field_name
 817                            break
 818
 819    def generate_renamed_fields(self) -> None:
 820        """Generate RenameField operations."""
 821        for (
 822            rem_package_label,
 823            rem_model_name,
 824            rem_field_name,
 825            package_label,
 826            model_name,
 827            field,
 828            field_name,
 829        ) in self.renamed_operations:
 830            self.add_operation(
 831                package_label,
 832                operations.RenameField(
 833                    model_name=model_name,
 834                    old_name=rem_field_name,
 835                    new_name=field_name,
 836                ),
 837            )
 838            self.old_field_keys.remove(
 839                (rem_package_label, rem_model_name, rem_field_name)
 840            )
 841            self.old_field_keys.add((package_label, model_name, field_name))
 842
 843    def generate_added_fields(self) -> None:
 844        """Make AddField operations."""
 845        for package_label, model_name, field_name in sorted(
 846            self.new_field_keys - self.old_field_keys
 847        ):
 848            self._generate_added_field(package_label, model_name, field_name)
 849
 850    def _generate_added_field(
 851        self, package_label: str, model_name: str, field_name: str
 852    ) -> None:
 853        field = self.to_state.models[package_label, model_name].get_field(field_name)
 854        # Adding a field always depends at least on its removal.
 855        dependencies = [(package_label, model_name, field_name, False)]
 856        # Fields that are foreignkeys/m2ms depend on stuff.
 857        if isinstance(field, RelatedField) and field.remote_field.model:
 858            dependencies.extend(
 859                self._get_dependencies_for_foreign_key(
 860                    package_label,
 861                    model_name,
 862                    field,
 863                    self.to_state,
 864                )
 865            )
 866        # You can't just add NOT NULL fields with no default or fields
 867        # which don't allow empty strings as default.
 868        time_fields = (DateField, DateTimeField, TimeField)
 869        preserve_default = (
 870            field.allow_null
 871            or field.has_default()
 872            or isinstance(field, ManyToManyField)
 873            or (not field.required and field.empty_strings_allowed)
 874            or (isinstance(field, time_fields) and field.auto_now)
 875        )
 876        if not preserve_default:
 877            field = field.clone()
 878            if isinstance(field, time_fields) and field.auto_now_add:
 879                field.default = self.questioner.ask_auto_now_add_addition(
 880                    field_name, model_name
 881                )
 882            else:
 883                field.default = self.questioner.ask_not_null_addition(
 884                    field_name, model_name
 885                )
 886        if (
 887            field.primary_key
 888            and field.default is not NOT_PROVIDED
 889            and callable(field.default)
 890        ):
 891            self.questioner.ask_unique_callable_default_addition(field_name, model_name)
 892        self.add_operation(
 893            package_label,
 894            operations.AddField(
 895                model_name=model_name,
 896                name=field_name,
 897                field=field,
 898                preserve_default=preserve_default,
 899            ),
 900            dependencies=dependencies,
 901        )
 902
 903    def generate_removed_fields(self) -> None:
 904        """Make RemoveField operations."""
 905        for package_label, model_name, field_name in sorted(
 906            self.old_field_keys - self.new_field_keys
 907        ):
 908            self._generate_removed_field(package_label, model_name, field_name)
 909
 910    def _generate_removed_field(
 911        self, package_label: str, model_name: str, field_name: str
 912    ) -> None:
 913        self.add_operation(
 914            package_label,
 915            operations.RemoveField(
 916                model_name=model_name,
 917                name=field_name,
 918            ),
 919        )
 920
 921    def generate_altered_fields(self) -> None:
 922        """
 923        Make AlterField operations, or possibly RemovedField/AddField if alter
 924        isn't possible.
 925        """
 926        for package_label, model_name, field_name in sorted(
 927            self.old_field_keys & self.new_field_keys
 928        ):
 929            # Did the field change?
 930            old_model_name = self.renamed_models.get(
 931                (package_label, model_name), model_name
 932            )
 933            old_field_name = self.renamed_fields.get(
 934                (package_label, model_name, field_name), field_name
 935            )
 936            old_field = self.from_state.models[package_label, old_model_name].get_field(
 937                old_field_name
 938            )
 939            new_field = self.to_state.models[package_label, model_name].get_field(
 940                field_name
 941            )
 942            dependencies = []
 943            # Implement any model renames on relations; these are handled by RenameModel
 944            # so we need to exclude them from the comparison
 945            if hasattr(new_field, "remote_field") and getattr(
 946                new_field.remote_field, "model", None
 947            ):
 948                rename_key = resolve_relation(
 949                    new_field.remote_field.model, package_label, model_name
 950                )
 951                if rename_key in self.renamed_models:
 952                    new_field.remote_field.model = old_field.remote_field.model
 953                # Handle ForeignKeyField which can only have a single to_field.
 954                remote_field_name = getattr(new_field.remote_field, "field_name", None)
 955                if remote_field_name:
 956                    to_field_rename_key = rename_key + (remote_field_name,)
 957                    if to_field_rename_key in self.renamed_fields:
 958                        # Repoint model name only
 959                        new_field.remote_field.model = old_field.remote_field.model
 960                dependencies.extend(
 961                    self._get_dependencies_for_foreign_key(
 962                        package_label,
 963                        model_name,
 964                        new_field,
 965                        self.to_state,
 966                    )
 967                )
 968            if hasattr(new_field, "remote_field") and getattr(
 969                new_field.remote_field, "through", None
 970            ):
 971                rename_key = resolve_relation(
 972                    new_field.remote_field.through, package_label, model_name
 973                )
 974                if rename_key in self.renamed_models:
 975                    new_field.remote_field.through = old_field.remote_field.through
 976            old_field_dec = self.deep_deconstruct(old_field)
 977            new_field_dec = self.deep_deconstruct(new_field)
 978            if old_field_dec != new_field_dec and old_field_name == field_name:
 979                both_m2m = isinstance(old_field, ManyToManyField) and isinstance(
 980                    new_field, ManyToManyField
 981                )
 982                neither_m2m = not isinstance(
 983                    old_field, ManyToManyField
 984                ) and not isinstance(new_field, ManyToManyField)
 985                if both_m2m or neither_m2m:
 986                    # Either both fields are m2m or neither is
 987                    preserve_default = True
 988                    if (
 989                        old_field.allow_null
 990                        and not new_field.allow_null
 991                        and not new_field.has_default()
 992                        and not isinstance(new_field, ManyToManyField)
 993                    ):
 994                        field = new_field.clone()
 995                        new_default = self.questioner.ask_not_null_alteration(
 996                            field_name, model_name
 997                        )
 998                        if new_default is not NOT_PROVIDED:
 999                            field.default = new_default
1000                            preserve_default = False
1001                    else:
1002                        field = new_field
1003                    self.add_operation(
1004                        package_label,
1005                        operations.AlterField(
1006                            model_name=model_name,
1007                            name=field_name,
1008                            field=field,
1009                            preserve_default=preserve_default,
1010                        ),
1011                        dependencies=dependencies,
1012                    )
1013                else:
1014                    # We cannot alter between m2m and concrete fields
1015                    self._generate_removed_field(package_label, model_name, field_name)
1016                    self._generate_added_field(package_label, model_name, field_name)
1017
1018    def create_altered_indexes(self) -> None:
1019        option_name = operations.AddIndex.option_name
1020
1021        for package_label, model_name in sorted(self.kept_model_keys):
1022            old_model_name = self.renamed_models.get(
1023                (package_label, model_name), model_name
1024            )
1025            old_model_state = self.from_state.models[package_label, old_model_name]
1026            new_model_state = self.to_state.models[package_label, model_name]
1027
1028            old_indexes = old_model_state.options[option_name]
1029            new_indexes = new_model_state.options[option_name]
1030            added_indexes = [idx for idx in new_indexes if idx not in old_indexes]
1031            removed_indexes = [idx for idx in old_indexes if idx not in new_indexes]
1032            renamed_indexes = []
1033            # Find renamed indexes.
1034            remove_from_added = []
1035            remove_from_removed = []
1036            for new_index in added_indexes:
1037                new_index_dec = new_index.deconstruct()
1038                new_index_name = new_index_dec[2].pop("name")
1039                for old_index in removed_indexes:
1040                    old_index_dec = old_index.deconstruct()
1041                    old_index_name = old_index_dec[2].pop("name")
1042                    # Indexes are the same except for the names.
1043                    if (
1044                        new_index_dec == old_index_dec
1045                        and new_index_name != old_index_name
1046                    ):
1047                        renamed_indexes.append((old_index_name, new_index_name, None))
1048                        remove_from_added.append(new_index)
1049                        remove_from_removed.append(old_index)
1050
1051            # Remove renamed indexes from the lists of added and removed
1052            # indexes.
1053            added_indexes = [
1054                idx for idx in added_indexes if idx not in remove_from_added
1055            ]
1056            removed_indexes = [
1057                idx for idx in removed_indexes if idx not in remove_from_removed
1058            ]
1059
1060            self.altered_indexes.update(
1061                {
1062                    (package_label, model_name): {
1063                        "added_indexes": added_indexes,
1064                        "removed_indexes": removed_indexes,
1065                        "renamed_indexes": renamed_indexes,
1066                    }
1067                }
1068            )
1069
1070    def generate_added_indexes(self) -> None:
1071        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1072            dependencies = self._get_dependencies_for_model(package_label, model_name)
1073            for index in alt_indexes["added_indexes"]:
1074                self.add_operation(
1075                    package_label,
1076                    operations.AddIndex(
1077                        model_name=model_name,
1078                        index=index,
1079                    ),
1080                    dependencies=dependencies,
1081                )
1082
1083    def generate_removed_indexes(self) -> None:
1084        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1085            for index in alt_indexes["removed_indexes"]:
1086                self.add_operation(
1087                    package_label,
1088                    operations.RemoveIndex(
1089                        model_name=model_name,
1090                        name=index.name,
1091                    ),
1092                )
1093
1094    def generate_renamed_indexes(self) -> None:
1095        for (package_label, model_name), alt_indexes in self.altered_indexes.items():
1096            for old_index_name, new_index_name, old_fields in alt_indexes[
1097                "renamed_indexes"
1098            ]:
1099                self.add_operation(
1100                    package_label,
1101                    operations.RenameIndex(
1102                        model_name=model_name,
1103                        new_name=new_index_name,
1104                        old_name=old_index_name,
1105                        old_fields=old_fields,
1106                    ),
1107                )
1108
1109    def create_altered_constraints(self) -> None:
1110        option_name = operations.AddConstraint.option_name
1111        for package_label, model_name in sorted(self.kept_model_keys):
1112            old_model_name = self.renamed_models.get(
1113                (package_label, model_name), model_name
1114            )
1115            old_model_state = self.from_state.models[package_label, old_model_name]
1116            new_model_state = self.to_state.models[package_label, model_name]
1117
1118            old_constraints = old_model_state.options[option_name]
1119            new_constraints = new_model_state.options[option_name]
1120            add_constraints = [c for c in new_constraints if c not in old_constraints]
1121            rem_constraints = [c for c in old_constraints if c not in new_constraints]
1122
1123            self.altered_constraints.update(
1124                {
1125                    (package_label, model_name): {
1126                        "added_constraints": add_constraints,
1127                        "removed_constraints": rem_constraints,
1128                    }
1129                }
1130            )
1131
1132    def generate_added_constraints(self) -> None:
1133        for (
1134            package_label,
1135            model_name,
1136        ), alt_constraints in self.altered_constraints.items():
1137            dependencies = self._get_dependencies_for_model(package_label, model_name)
1138            for constraint in alt_constraints["added_constraints"]:
1139                self.add_operation(
1140                    package_label,
1141                    operations.AddConstraint(
1142                        model_name=model_name,
1143                        constraint=constraint,
1144                    ),
1145                    dependencies=dependencies,
1146                )
1147
1148    def generate_removed_constraints(self) -> None:
1149        for (
1150            package_label,
1151            model_name,
1152        ), alt_constraints in self.altered_constraints.items():
1153            for constraint in alt_constraints["removed_constraints"]:
1154                self.add_operation(
1155                    package_label,
1156                    operations.RemoveConstraint(
1157                        model_name=model_name,
1158                        name=constraint.name,
1159                    ),
1160                )
1161
1162    @staticmethod
1163    def _get_dependencies_for_foreign_key(
1164        package_label: str, model_name: str, field: Field, project_state: ProjectState
1165    ) -> list[tuple[str, str, str | None, bool | str]]:
1166        remote_field_model = None
1167        if isinstance(field, RelatedField):
1168            remote_field_model = field.remote_field.model
1169        else:
1170            relations = project_state.relations[package_label, model_name]
1171            for (remote_package_label, remote_model_name), fields in relations.items():
1172                if any(
1173                    field == related_field.remote_field
1174                    for related_field in fields.values()
1175                    if isinstance(related_field, RelatedField)
1176                ):
1177                    remote_field_model = f"{remote_package_label}.{remote_model_name}"
1178                    break
1179        dep_package_label, dep_object_name = resolve_relation(
1180            remote_field_model,
1181            package_label,
1182            model_name,
1183        )
1184        dependencies = [(dep_package_label, dep_object_name, None, True)]
1185        if isinstance(field, RelatedField) and isinstance(
1186            field.remote_field, ManyToManyRel
1187        ):
1188            through_package_label, through_object_name = resolve_relation(
1189                field.remote_field.through,
1190                package_label,
1191                model_name,
1192            )
1193            dependencies.append(
1194                (through_package_label, through_object_name, None, True)
1195            )
1196        return dependencies
1197
1198    def _get_dependencies_for_model(
1199        self, package_label: str, model_name: str
1200    ) -> list[tuple[str, str, str | None, bool | str]]:
1201        """Return foreign key dependencies of the given model."""
1202        dependencies = []
1203        model_state = self.to_state.models[package_label, model_name]
1204        for field in model_state.fields.values():
1205            if isinstance(field, RelatedField):
1206                dependencies.extend(
1207                    self._get_dependencies_for_foreign_key(
1208                        package_label,
1209                        model_name,
1210                        field,
1211                        self.to_state,
1212                    )
1213                )
1214        return dependencies
1215
1216    def generate_altered_db_table(self) -> None:
1217        for package_label, model_name in sorted(self.kept_model_keys):
1218            old_model_name = self.renamed_models.get(
1219                (package_label, model_name), model_name
1220            )
1221            old_model_state = self.from_state.models[package_label, old_model_name]
1222            new_model_state = self.to_state.models[package_label, model_name]
1223            old_db_table_name = old_model_state.options.get("db_table")
1224            new_db_table_name = new_model_state.options.get("db_table")
1225            if old_db_table_name != new_db_table_name:
1226                self.add_operation(
1227                    package_label,
1228                    operations.AlterModelTable(
1229                        name=model_name,
1230                        table=new_db_table_name,
1231                    ),
1232                )
1233
1234    def generate_altered_options(self) -> None:
1235        """
1236        Work out if any non-schema-affecting options have changed and make an
1237        operation to represent them in state changes (in case Python code in
1238        migrations needs them).
1239        """
1240        for package_label, model_name in sorted(self.kept_model_keys):
1241            old_model_name = self.renamed_models.get(
1242                (package_label, model_name), model_name
1243            )
1244            old_model_state = self.from_state.models[package_label, old_model_name]
1245            new_model_state = self.to_state.models[package_label, model_name]
1246            old_options = {
1247                key: value
1248                for key, value in old_model_state.options.items()
1249                if key in AlterModelOptions.ALTER_OPTION_KEYS
1250            }
1251            new_options = {
1252                key: value
1253                for key, value in new_model_state.options.items()
1254                if key in AlterModelOptions.ALTER_OPTION_KEYS
1255            }
1256            if old_options != new_options:
1257                self.add_operation(
1258                    package_label,
1259                    operations.AlterModelOptions(
1260                        name=model_name,
1261                        options=new_options,
1262                    ),
1263                )
1264
1265    def arrange_for_graph(
1266        self,
1267        changes: dict[str, list[Migration]],
1268        graph: MigrationGraph,
1269        migration_name: str | None = None,
1270    ) -> dict[str, list[Migration]]:
1271        """
1272        Take a result from changes() and a MigrationGraph, and fix the names
1273        and dependencies of the changes so they extend the graph from the leaf
1274        nodes for each app.
1275        """
1276        leaves = graph.leaf_nodes()
1277        name_map = {}
1278        for package_label, migrations in list(changes.items()):
1279            if not migrations:
1280                continue
1281            # Find the app label's current leaf node
1282            app_leaf = None
1283            for leaf in leaves:
1284                if leaf[0] == package_label:
1285                    app_leaf = leaf
1286                    break
1287            # Do they want an initial migration for this app?
1288            if app_leaf is None and not self.questioner.ask_initial(package_label):
1289                # They don't.
1290                for migration in migrations:
1291                    name_map[(package_label, migration.name)] = (
1292                        package_label,
1293                        "__first__",
1294                    )
1295                del changes[package_label]
1296                continue
1297            # Work out the next number in the sequence
1298            if app_leaf is None:
1299                next_number = 1
1300            else:
1301                next_number = (self.parse_number(app_leaf[1]) or 0) + 1
1302            # Name each migration
1303            for i, migration in enumerate(migrations):
1304                if i == 0 and app_leaf:
1305                    migration.dependencies.append(app_leaf)
1306                new_name_parts = ["%04i" % next_number]  # noqa: UP031
1307                if migration_name:
1308                    new_name_parts.append(migration_name)
1309                elif i == 0 and not app_leaf:
1310                    new_name_parts.append("initial")
1311                else:
1312                    new_name_parts.append(migration.suggest_name()[:100])
1313                new_name = "_".join(new_name_parts)
1314                name_map[(package_label, migration.name)] = (package_label, new_name)
1315                next_number += 1
1316                migration.name = new_name
1317        # Now fix dependencies
1318        for migrations in changes.values():
1319            for migration in migrations:
1320                migration.dependencies = [
1321                    name_map.get(d, d) for d in migration.dependencies
1322                ]
1323        return changes
1324
1325    def _trim_to_packages(
1326        self, changes: dict[str, list[Migration]], package_labels: set[str]
1327    ) -> dict[str, list[Migration]]:
1328        """
1329        Take changes from arrange_for_graph() and set of app labels, and return
1330        a modified set of changes which trims out as many migrations that are
1331        not in package_labels as possible. Note that some other migrations may
1332        still be present as they may be required dependencies.
1333        """
1334        # Gather other app dependencies in a first pass
1335        app_dependencies = {}
1336        for package_label, migrations in changes.items():
1337            for migration in migrations:
1338                for dep_package_label, name in migration.dependencies:
1339                    app_dependencies.setdefault(package_label, set()).add(
1340                        dep_package_label
1341                    )
1342        required_packages = set(package_labels)
1343        # Keep resolving till there's no change
1344        old_required_packages = None
1345        while old_required_packages != required_packages:
1346            old_required_packages = set(required_packages)
1347            required_packages.update(
1348                *[
1349                    app_dependencies.get(package_label, ())
1350                    for package_label in required_packages
1351                ]
1352            )
1353        # Remove all migrations that aren't needed
1354        for package_label in list(changes):
1355            if package_label not in required_packages:
1356                del changes[package_label]
1357        return changes
1358
1359    @classmethod
1360    def parse_number(cls, name: str) -> int | None:
1361        """
1362        Given a migration name, try to extract a number from the beginning of
1363        it. For a squashed migration such as '0001_squashed_0004…', return the
1364        second number. If no number is found, return None.
1365        """
1366        if squashed_match := re.search(r".*_squashed_(\d+)", name):
1367            return int(squashed_match[1])
1368        match = re.match(r"^\d+", name)
1369        if match:
1370            return int(match[0])
1371        return None