1from __future__ import annotations
   2
   3import os
   4import sys
   5import time
   6from typing import TYPE_CHECKING, Any, cast
   7
   8import click
   9
  10from plain.cli import register_cli
  11from plain.cli.runtime import common_command
  12from plain.packages import packages_registry
  13from plain.runtime import settings
  14from plain.utils.text import Truncator
  15
  16from .. import migrations
  17from ..backups.core import DatabaseBackups
  18from ..db import db_connection as _db_connection
  19from ..migrations.autodetector import MigrationAutodetector
  20from ..migrations.executor import MigrationExecutor
  21from ..migrations.loader import AmbiguityError, MigrationLoader
  22from ..migrations.migration import Migration, SettingsTuple
  23from ..migrations.optimizer import MigrationOptimizer
  24from ..migrations.questioner import (
  25    InteractiveMigrationQuestioner,
  26    NonInteractiveMigrationQuestioner,
  27)
  28from ..migrations.recorder import MigrationRecorder
  29from ..migrations.state import ModelState, ProjectState
  30from ..migrations.writer import MigrationWriter
  31from ..registry import models_registry
  32
  33if TYPE_CHECKING:
  34    from ..backends.base.base import BaseDatabaseWrapper
  35    from ..migrations.operations.base import Operation
  36
  37    db_connection = cast("BaseDatabaseWrapper", _db_connection)
  38else:
  39    db_connection = _db_connection
  40
  41
  42@register_cli("migrations")
  43@click.group()
  44def cli() -> None:
  45    """Database migration management"""
  46
  47
  48@common_command
  49@register_cli("makemigrations", shortcut_for="migrations make")
  50@cli.command("make")
  51@click.argument("package_labels", nargs=-1)
  52@click.option(
  53    "--dry-run",
  54    is_flag=True,
  55    help="Just show what migrations would be made; don't actually write them.",
  56)
  57@click.option("--empty", is_flag=True, help="Create an empty migration.")
  58@click.option(
  59    "--noinput",
  60    "--no-input",
  61    "no_input",
  62    is_flag=True,
  63    help="Tells Plain to NOT prompt the user for input of any kind.",
  64)
  65@click.option("-n", "--name", help="Use this name for migration file(s).")
  66@click.option(
  67    "--check",
  68    is_flag=True,
  69    help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.",
  70)
  71@click.option(
  72    "-v",
  73    "--verbosity",
  74    type=int,
  75    default=1,
  76    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
  77)
  78def make(
  79    package_labels: tuple[str, ...],
  80    dry_run: bool,
  81    empty: bool,
  82    no_input: bool,
  83    name: str | None,
  84    check: bool,
  85    verbosity: int,
  86) -> None:
  87    """Create new database migrations"""
  88
  89    written_files: list[str] = []
  90    interactive = not no_input
  91    migration_name = name
  92    check_changes = check
  93
  94    def log(msg: str, level: int = 1) -> None:
  95        if verbosity >= level:
  96            click.echo(msg)
  97
  98    def write_migration_files(
  99        changes: dict[str, list[Migration]],
 100        update_previous_migration_paths: dict[str, str] | None = None,
 101    ) -> None:
 102        """Take a changes dict and write them out as migration files."""
 103        directory_created = {}
 104        for package_label, package_migrations in changes.items():
 105            log(
 106                click.style(f"Migrations for '{package_label}':", fg="cyan", bold=True),
 107                level=1,
 108            )
 109            for migration in package_migrations:
 110                writer = MigrationWriter(migration)
 111                migration_string = os.path.relpath(writer.path)
 112                log(f"  {click.style(migration_string, fg='yellow')}\n", level=1)
 113                for operation in migration.operations:
 114                    log(f"    - {operation.describe()}", level=1)
 115
 116                if not dry_run:
 117                    migrations_directory = os.path.dirname(writer.path)
 118                    if not directory_created.get(package_label):
 119                        os.makedirs(migrations_directory, exist_ok=True)
 120                        init_path = os.path.join(migrations_directory, "__init__.py")
 121                        if not os.path.isfile(init_path):
 122                            open(init_path, "w").close()
 123                        directory_created[package_label] = True
 124
 125                    migration_string = writer.as_string()
 126                    with open(writer.path, "w", encoding="utf-8") as fh:
 127                        fh.write(migration_string)
 128                        written_files.append(writer.path)
 129
 130                    if update_previous_migration_paths:
 131                        prev_path = update_previous_migration_paths[package_label]
 132                        if writer.needs_manual_porting:
 133                            log(
 134                                click.style(
 135                                    f"Updated migration {migration_string} requires manual porting.\n"
 136                                    f"Previous migration {os.path.relpath(prev_path)} was kept and "
 137                                    f"must be deleted after porting functions manually.",
 138                                    fg="yellow",
 139                                ),
 140                                level=1,
 141                            )
 142                        else:
 143                            os.remove(prev_path)
 144                            log(f"Deleted {os.path.relpath(prev_path)}", level=1)
 145                elif verbosity >= 3:
 146                    log(
 147                        click.style(
 148                            f"Full migrations file '{writer.filename}':",
 149                            fg="cyan",
 150                            bold=True,
 151                        ),
 152                        level=3,
 153                    )
 154                    log(writer.as_string(), level=3)
 155
 156    # Validate package labels
 157    package_labels_set = set(package_labels)
 158    has_bad_labels = False
 159    for package_label in package_labels_set:
 160        try:
 161            packages_registry.get_package_config(package_label)
 162        except LookupError as err:
 163            click.echo(str(err), err=True)
 164            has_bad_labels = True
 165    if has_bad_labels:
 166        sys.exit(2)
 167
 168    # Load the current graph state
 169    loader = MigrationLoader(None, ignore_no_migrations=True)
 170
 171    # Raise an error if any migrations are applied before their dependencies.
 172    # Only the default db_connection is supported.
 173    loader.check_consistent_history(db_connection)
 174
 175    # Check for conflicts
 176    conflicts = loader.detect_conflicts()
 177    if package_labels_set:
 178        conflicts = {
 179            package_label: conflict
 180            for package_label, conflict in conflicts.items()
 181            if package_label in package_labels_set
 182        }
 183
 184    if conflicts:
 185        name_str = "; ".join(
 186            "{} in {}".format(", ".join(names), package)
 187            for package, names in conflicts.items()
 188        )
 189        raise click.ClickException(
 190            f"Conflicting migrations detected; multiple leaf nodes in the "
 191            f"migration graph: ({name_str})."
 192        )
 193
 194    # Set up questioner
 195    if interactive:
 196        questioner = InteractiveMigrationQuestioner(
 197            specified_packages=package_labels_set,
 198            dry_run=dry_run,
 199        )
 200    else:
 201        questioner = NonInteractiveMigrationQuestioner(
 202            specified_packages=package_labels_set,
 203            dry_run=dry_run,
 204            verbosity=verbosity,
 205        )
 206
 207    # Set up autodetector
 208    autodetector = MigrationAutodetector(
 209        loader.project_state(),
 210        ProjectState.from_models_registry(models_registry),
 211        questioner,
 212    )
 213
 214    # Handle empty migrations if requested
 215    if empty:
 216        if not package_labels_set:
 217            raise click.ClickException(
 218                "You must supply at least one package label when using --empty."
 219            )
 220        changes = {
 221            package: [Migration("custom", package)] for package in package_labels_set
 222        }
 223        changes = autodetector.arrange_for_graph(
 224            changes=changes,
 225            graph=loader.graph,
 226            migration_name=migration_name,
 227        )
 228        write_migration_files(changes)
 229        return
 230
 231    # Detect changes
 232    changes = autodetector.changes(
 233        graph=loader.graph,
 234        trim_to_packages=package_labels_set or None,
 235        convert_packages=package_labels_set or None,
 236        migration_name=migration_name,
 237    )
 238
 239    if not changes:
 240        log(
 241            "No changes detected"
 242            if not package_labels_set
 243            else f"No changes detected in {'package' if len(package_labels_set) == 1 else 'packages'} "
 244            f"'{', '.join(package_labels_set)}'",
 245            level=1,
 246        )
 247    else:
 248        if check_changes:
 249            sys.exit(1)
 250
 251        write_migration_files(changes)
 252
 253
 254@common_command
 255@register_cli("migrate", shortcut_for="migrations apply")
 256@cli.command("apply")
 257@click.argument("package_label", required=False)
 258@click.argument("migration_name", required=False)
 259@click.option(
 260    "--fake", is_flag=True, help="Mark migrations as run without actually running them."
 261)
 262@click.option(
 263    "--plan",
 264    is_flag=True,
 265    help="Shows a list of the migration actions that will be performed.",
 266)
 267@click.option(
 268    "--check",
 269    "check_unapplied",
 270    is_flag=True,
 271    help="Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.",
 272)
 273@click.option(
 274    "--backup/--no-backup",
 275    "backup",
 276    is_flag=True,
 277    default=None,
 278    help="Explicitly enable/disable pre-migration backups.",
 279)
 280@click.option(
 281    "--no-input",
 282    "--noinput",
 283    "no_input",
 284    is_flag=True,
 285    help="Tells Plain to NOT prompt the user for input of any kind.",
 286)
 287@click.option(
 288    "--atomic-batch/--no-atomic-batch",
 289    default=None,
 290    help="Run migrations in a single transaction (auto-detected by default)",
 291)
 292@click.option(
 293    "--quiet",
 294    is_flag=True,
 295    help="Suppress migration output (used for test database creation).",
 296)
 297def apply(
 298    package_label: str | None,
 299    migration_name: str | None,
 300    fake: bool,
 301    plan: bool,
 302    check_unapplied: bool,
 303    backup: bool | None,
 304    no_input: bool,
 305    atomic_batch: bool | None,
 306    quiet: bool,
 307) -> None:
 308    """Apply database migrations"""
 309
 310    def migration_progress_callback(
 311        action: str,
 312        *,
 313        migration: Migration | None = None,
 314        fake: bool = False,
 315        operation: Operation | None = None,
 316        sql_statements: list[str] | None = None,
 317    ) -> None:
 318        if quiet:
 319            return
 320
 321        if action == "apply_start":
 322            click.echo()  # Always add newline between migrations
 323            if fake:
 324                click.secho(f"{migration} (faked)", fg="cyan")
 325            else:
 326                click.secho(f"{migration}", fg="cyan")
 327        elif action == "apply_success":
 328            pass  # Already shown via operations
 329        elif action == "operation_start":
 330            click.echo(f"  {operation.describe()}", nl=False)
 331            click.secho("... ", dim=True, nl=False)
 332        elif action == "operation_success":
 333            # Show SQL statements (no OK needed, SQL implies success)
 334            if sql_statements:
 335                click.echo()  # newline after "..."
 336                for sql in sql_statements:
 337                    click.secho(f"    {sql}", dim=True)
 338            else:
 339                # No SQL: just add a newline
 340                click.echo()
 341
 342    def describe_operation(operation: Any) -> tuple[str, bool]:
 343        """Return a string that describes a migration operation for --plan."""
 344        prefix = ""
 345        is_error = False
 346        if hasattr(operation, "code"):
 347            code = operation.code
 348            action = (code.__doc__ or "") if code else None
 349        elif hasattr(operation, "sql"):
 350            action = operation.sql
 351        else:
 352            action = ""
 353        if action is not None:
 354            action = str(action).replace("\n", "")
 355        if action:
 356            action = " -> " + action
 357        truncated = Truncator(action)
 358        return prefix + operation.describe() + truncated.chars(40), is_error
 359
 360    # Get the database we're operating from
 361    # Hook for backends needing any database preparation
 362    db_connection.prepare_database()
 363
 364    # Work out which packages have migrations and which do not
 365    executor = MigrationExecutor(db_connection, migration_progress_callback)
 366
 367    # Raise an error if any migrations are applied before their dependencies.
 368    executor.loader.check_consistent_history(db_connection)
 369
 370    # Before anything else, see if there's conflicting packages and drop out
 371    # hard if there are any
 372    conflicts = executor.loader.detect_conflicts()
 373    if conflicts:
 374        name_str = "; ".join(
 375            "{} in {}".format(", ".join(names), package)
 376            for package, names in conflicts.items()
 377        )
 378        raise click.ClickException(
 379            "Conflicting migrations detected; multiple leaf nodes in the "
 380            f"migration graph: ({name_str})."
 381        )
 382
 383    # If they supplied command line arguments, work out what they mean.
 384    target_package_labels_only = True
 385    targets: list[tuple[str, str]]
 386    if package_label:
 387        try:
 388            packages_registry.get_package_config(package_label)
 389        except LookupError as err:
 390            raise click.ClickException(str(err))
 391
 392        if package_label not in executor.loader.migrated_packages:
 393            raise click.ClickException(
 394                f"Package '{package_label}' does not have migrations."
 395            )
 396
 397    if package_label and migration_name:
 398        try:
 399            migration = executor.loader.get_migration_by_prefix(
 400                package_label, migration_name
 401            )
 402        except AmbiguityError:
 403            raise click.ClickException(
 404                f"More than one migration matches '{migration_name}' in package '{package_label}'. "
 405                "Please be more specific."
 406            )
 407        except KeyError:
 408            raise click.ClickException(
 409                f"Cannot find a migration matching '{migration_name}' from package '{package_label}'."
 410            )
 411        target: tuple[str, str] = (package_label, migration.name)
 412        if (
 413            target not in executor.loader.graph.nodes
 414            and target in executor.loader.replacements
 415        ):
 416            incomplete_migration = executor.loader.replacements[target]
 417            target = incomplete_migration.replaces[-1]  # type: ignore[assignment]
 418        targets = [target]
 419        target_package_labels_only = False
 420    elif package_label:
 421        targets = [
 422            key for key in executor.loader.graph.leaf_nodes() if key[0] == package_label
 423        ]
 424    else:
 425        targets = list(executor.loader.graph.leaf_nodes())
 426
 427    migration_plan = executor.migration_plan(targets)
 428
 429    if plan:
 430        if not quiet:
 431            click.secho("Planned operations:", fg="cyan")
 432            if not migration_plan:
 433                click.echo("  No planned migration operations.")
 434            else:
 435                for migration in migration_plan:
 436                    click.secho(str(migration), fg="cyan")
 437                    for operation in migration.operations:
 438                        message, is_error = describe_operation(operation)
 439                        if is_error:
 440                            click.secho("    " + message, fg="yellow")
 441                        else:
 442                            click.echo("    " + message)
 443        if check_unapplied:
 444            sys.exit(1)
 445        return
 446
 447    if check_unapplied:
 448        if migration_plan:
 449            sys.exit(1)
 450        return
 451
 452    # Print some useful info
 453    if not quiet:
 454        if target_package_labels_only:
 455            packages = ", ".join(sorted({a for a, n in targets})) or "(none)"
 456            click.secho("Packages: ", bold=True, nl=False)
 457            click.secho(packages, dim=True)
 458            click.echo()  # Add newline after packages
 459        else:
 460            click.secho("Target: ", bold=True, nl=False)
 461            click.secho(f"{targets[0][1]} from {targets[0][0]}", dim=True)
 462            click.echo()  # Add newline after target
 463
 464    pre_migrate_state = executor._create_project_state(with_applied_migrations=True)
 465
 466    if migration_plan:
 467        # Determine whether to use atomic batch
 468        use_atomic_batch = False
 469        atomic_batch_message = None
 470        if len(migration_plan) > 1:
 471            # Check database capabilities
 472            can_rollback_ddl = db_connection.features.can_rollback_ddl
 473
 474            # Check if all migrations support atomic
 475            non_atomic_migrations = [m for m in migration_plan if not m.atomic]
 476
 477            if atomic_batch is True:
 478                # User explicitly requested atomic batch
 479                if not can_rollback_ddl:
 480                    raise click.UsageError(
 481                        f"--atomic-batch not supported on {db_connection.vendor}. "
 482                        "Remove the flag or use a database that supports transactional DDL."
 483                    )
 484                if non_atomic_migrations:
 485                    names = ", ".join(
 486                        f"{m.package_label}.{m.name}" for m in non_atomic_migrations[:3]
 487                    )
 488                    if len(non_atomic_migrations) > 3:
 489                        names += f", and {len(non_atomic_migrations) - 3} more"
 490                    raise click.UsageError(
 491                        f"--atomic-batch requested but these migrations have atomic=False: {names}"
 492                    )
 493                use_atomic_batch = True
 494                atomic_batch_message = (
 495                    f"Running {len(migration_plan)} migrations in atomic batch"
 496                )
 497            elif atomic_batch is False:
 498                # User explicitly disabled atomic batch
 499                use_atomic_batch = False
 500                if len(migration_plan) > 1:
 501                    atomic_batch_message = (
 502                        f"Running {len(migration_plan)} migrations separately"
 503                    )
 504            else:
 505                # Auto-detect (atomic_batch is None)
 506                # SQLite is excluded because it requires foreign key constraints to be
 507                # disabled before entering a transaction, which conflicts with batch mode
 508                if (
 509                    can_rollback_ddl
 510                    and not non_atomic_migrations
 511                    and db_connection.vendor != "sqlite"
 512                ):
 513                    use_atomic_batch = True
 514                    atomic_batch_message = (
 515                        f"Running {len(migration_plan)} migrations in atomic batch"
 516                    )
 517                else:
 518                    use_atomic_batch = False
 519                    if len(migration_plan) > 1:
 520                        if not can_rollback_ddl:
 521                            atomic_batch_message = f"Running {len(migration_plan)} migrations separately ({db_connection.vendor} doesn't support batch)"
 522                        elif non_atomic_migrations:
 523                            atomic_batch_message = f"Running {len(migration_plan)} migrations separately (some have atomic=False)"
 524                        elif db_connection.vendor == "sqlite":
 525                            atomic_batch_message = f"Running {len(migration_plan)} migrations separately (SQLite doesn't support batch)"
 526                        else:
 527                            atomic_batch_message = (
 528                                f"Running {len(migration_plan)} migrations separately"
 529                            )
 530
 531        if backup or (backup is None and settings.DEBUG):
 532            backup_name = f"migrate_{time.strftime('%Y%m%d_%H%M%S')}"
 533            if not quiet:
 534                click.secho("Creating backup: ", bold=True, nl=False)
 535                click.secho(f"{backup_name}", dim=True, nl=False)
 536                click.secho("... ", dim=True, nl=False)
 537
 538            backups_handler = DatabaseBackups()
 539            backups_handler.create(
 540                backup_name,
 541                pg_dump=os.environ.get("PG_DUMP", "pg_dump"),
 542            )
 543
 544            if not quiet:
 545                click.echo(click.style("OK", fg="green"))
 546                click.echo()  # Add blank line after backup output
 547        else:
 548            if not quiet:
 549                click.echo()  # Add blank line after packages/target info
 550
 551        if not quiet:
 552            if atomic_batch_message:
 553                click.secho(
 554                    f"Applying migrations ({atomic_batch_message.lower()}):", bold=True
 555                )
 556            else:
 557                click.secho("Applying migrations:", bold=True)
 558        post_migrate_state = executor.migrate(
 559            targets,
 560            plan=migration_plan,
 561            state=pre_migrate_state.clone(),
 562            fake=fake,
 563            atomic_batch=use_atomic_batch,
 564        )
 565        # post_migrate signals have access to all models. Ensure that all models
 566        # are reloaded in case any are delayed.
 567        post_migrate_state.clear_delayed_models_cache()
 568        post_migrate_packages = post_migrate_state.models_registry
 569
 570        # Re-render models of real packages to include relationships now that
 571        # we've got a final state. This wouldn't be necessary if real packages
 572        # models were rendered with relationships in the first place.
 573        with post_migrate_packages.bulk_update():
 574            model_keys = []
 575            for model_state in post_migrate_packages.real_models:
 576                model_key = model_state.package_label, model_state.name_lower
 577                model_keys.append(model_key)
 578                post_migrate_packages.unregister_model(*model_key)
 579        post_migrate_packages.render_multiple(
 580            [
 581                ModelState.from_model(models_registry.get_model(*model))
 582                for model in model_keys
 583            ]
 584        )
 585
 586    else:
 587        if not quiet:
 588            click.echo("No migrations to apply.")
 589            # If there's changes that aren't in migrations yet, tell them
 590            # how to fix it.
 591            autodetector = MigrationAutodetector(
 592                executor.loader.project_state(),
 593                ProjectState.from_models_registry(models_registry),
 594            )
 595            changes = autodetector.changes(graph=executor.loader.graph)
 596            if changes:
 597                packages = ", ".join(sorted(changes))
 598                click.echo(
 599                    f"Your models have changes that are not yet reflected in migrations ({packages})."
 600                )
 601                click.echo(
 602                    "Run 'plain makemigrations' to create migrations for these changes."
 603                )
 604
 605
 606@cli.command("list")
 607@click.argument("package_labels", nargs=-1)
 608@click.option(
 609    "--format",
 610    type=click.Choice(["list", "plan"]),
 611    default="list",
 612    help="Output format.",
 613)
 614@click.option(
 615    "-v",
 616    "--verbosity",
 617    type=int,
 618    default=1,
 619    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 620)
 621def list_migrations(
 622    package_labels: tuple[str, ...], format: str, verbosity: int
 623) -> None:
 624    """Show all migrations"""
 625
 626    def _validate_package_names(package_names: tuple[str, ...]) -> None:
 627        has_bad_names = False
 628        for package_name in package_names:
 629            try:
 630                packages_registry.get_package_config(package_name)
 631            except LookupError as err:
 632                click.echo(str(err), err=True)
 633                has_bad_names = True
 634        if has_bad_names:
 635            sys.exit(2)
 636
 637    def show_list(db_connection: Any, package_names: tuple[str, ...]) -> None:
 638        """
 639        Show a list of all migrations on the system, or only those of
 640        some named packages.
 641        """
 642        # Load migrations from disk/DB
 643        loader = MigrationLoader(db_connection, ignore_no_migrations=True)
 644        recorder = MigrationRecorder(db_connection)
 645        recorded_migrations = recorder.applied_migrations()
 646
 647        graph = loader.graph
 648        # If we were passed a list of packages, validate it
 649        package_names_list: list[str]
 650        if package_names:
 651            _validate_package_names(package_names)
 652            package_names_list = list(package_names)
 653        # Otherwise, show all packages in alphabetic order
 654        else:
 655            package_names_list = sorted(loader.migrated_packages)
 656        # For each app, print its migrations in order from oldest (roots) to
 657        # newest (leaves).
 658        for package_name in package_names_list:
 659            click.secho(package_name, fg="cyan", bold=True)
 660            shown = set()
 661            for node in graph.leaf_nodes(package_name):
 662                for plan_node in graph.forwards_plan(node):
 663                    if plan_node not in shown and plan_node[0] == package_name:
 664                        # Give it a nice title if it's a squashed one
 665                        title = plan_node[1]
 666                        if graph.nodes[plan_node].replaces:  # type: ignore[union-attr]
 667                            title += f" ({len(graph.nodes[plan_node].replaces)} squashed migrations)"  # type: ignore[union-attr]
 668                        applied_migration = loader.applied_migrations.get(plan_node)  # type: ignore[union-attr]
 669                        # Mark it as applied/unapplied
 670                        if applied_migration:
 671                            if plan_node in recorded_migrations:
 672                                output = f" [X] {title}"
 673                            else:
 674                                title += " Run `plain migrate` to finish recording."
 675                                output = f" [-] {title}"
 676                            if verbosity >= 2 and hasattr(applied_migration, "applied"):
 677                                output += f" (applied at {applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')})"
 678                            click.echo(output)
 679                        else:
 680                            click.echo(f" [ ] {title}")
 681                        shown.add(plan_node)
 682            # If we didn't print anything, then a small message
 683            if not shown:
 684                click.secho(" (no migrations)", fg="red")
 685
 686    def show_plan(db_connection: Any, package_names: tuple[str, ...]) -> None:
 687        """
 688        Show all known migrations (or only those of the specified package_names)
 689        in the order they will be applied.
 690        """
 691        # Load migrations from disk/DB
 692        loader = MigrationLoader(db_connection)
 693        graph = loader.graph
 694        if package_names:
 695            _validate_package_names(package_names)
 696            targets = [key for key in graph.leaf_nodes() if key[0] in package_names]
 697        else:
 698            targets = graph.leaf_nodes()
 699        plan = []
 700        seen = set()
 701
 702        # Generate the plan
 703        for target in targets:
 704            for migration in graph.forwards_plan(target):
 705                if migration not in seen:
 706                    node = graph.node_map[migration]
 707                    plan.append(node)
 708                    seen.add(migration)
 709
 710        # Output
 711        def print_deps(node: Any) -> str:
 712            out = []
 713            for parent in sorted(node.parents):
 714                out.append(f"{parent.key[0]}.{parent.key[1]}")
 715            if out:
 716                return f" ... ({', '.join(out)})"
 717            return ""
 718
 719        for node in plan:
 720            deps = ""
 721            if verbosity >= 2:
 722                deps = print_deps(node)
 723            if node.key in loader.applied_migrations:  # type: ignore[operator]
 724                click.echo(f"[X]  {node.key[0]}.{node.key[1]}{deps}")
 725            else:
 726                click.echo(f"[ ]  {node.key[0]}.{node.key[1]}{deps}")
 727        if not plan:
 728            click.secho("(no migrations)", fg="red")
 729
 730    # Get the database we're operating from
 731
 732    if format == "plan":
 733        show_plan(db_connection, package_labels)
 734    else:
 735        show_list(db_connection, package_labels)
 736
 737
 738@cli.command("prune")
 739@click.option(
 740    "--yes",
 741    is_flag=True,
 742    help="Skip confirmation prompt (for non-interactive use).",
 743)
 744def prune(yes: bool) -> None:
 745    """Remove stale migration records from the database"""
 746    # Load migrations from disk and database
 747    loader = MigrationLoader(db_connection, ignore_no_migrations=True)
 748    recorder = MigrationRecorder(db_connection)
 749    recorded_migrations = recorder.applied_migrations()
 750
 751    # Find all prunable migrations (recorded but not on disk)
 752    all_prunable = [
 753        migration
 754        for migration in recorded_migrations
 755        if migration not in loader.disk_migrations  # type: ignore[operator]
 756    ]
 757
 758    if not all_prunable:
 759        click.echo("No stale migration records found.")
 760        return
 761
 762    # Separate into existing packages vs orphaned packages
 763    existing_packages = set(loader.migrated_packages)
 764    prunable_existing: dict[str, list[str]] = {}
 765    prunable_orphaned: dict[str, list[str]] = {}
 766
 767    for migration in all_prunable:
 768        package, name = migration
 769        if package in existing_packages:
 770            if package not in prunable_existing:
 771                prunable_existing[package] = []
 772            prunable_existing[package].append(name)
 773        else:
 774            if package not in prunable_orphaned:
 775                prunable_orphaned[package] = []
 776            prunable_orphaned[package].append(name)
 777
 778    # Display what was found
 779    if prunable_existing:
 780        click.secho(
 781            "Stale migration records (from existing packages):",
 782            fg="yellow",
 783            bold=True,
 784        )
 785        for package in sorted(prunable_existing.keys()):
 786            click.secho(f"  {package}:", fg="yellow")
 787            for name in sorted(prunable_existing[package]):
 788                click.echo(f"    - {name}")
 789        click.echo()
 790
 791    if prunable_orphaned:
 792        click.secho(
 793            "Orphaned migration records (from removed packages):",
 794            fg="red",
 795            bold=True,
 796        )
 797        for package in sorted(prunable_orphaned.keys()):
 798            click.secho(f"  {package}:", fg="red")
 799            for name in sorted(prunable_orphaned[package]):
 800                click.echo(f"    - {name}")
 801        click.echo()
 802
 803    total_count = sum(len(migs) for migs in prunable_existing.values()) + sum(
 804        len(migs) for migs in prunable_orphaned.values()
 805    )
 806
 807    if not yes:
 808        click.echo(
 809            f"Found {total_count} stale migration record{'s' if total_count != 1 else ''}."
 810        )
 811        click.echo()
 812
 813        # Prompt for confirmation if interactive
 814        if not click.confirm(
 815            "Do you want to remove these migrations from the database?"
 816        ):
 817            return
 818
 819    # Actually prune the migrations
 820    click.secho("Pruning migrations...", bold=True)
 821
 822    for package, migration_names in prunable_existing.items():
 823        for name in sorted(migration_names):
 824            click.echo(f"  Pruning {package}.{name}...", nl=False)
 825            recorder.record_unapplied(package, name)
 826            click.echo(" OK")
 827
 828    for package, migration_names in prunable_orphaned.items():
 829        for name in sorted(migration_names):
 830            click.echo(f"  Pruning {package}.{name} (orphaned)...", nl=False)
 831            recorder.record_unapplied(package, name)
 832            click.echo(" OK")
 833
 834    click.secho(
 835        f"✓ Removed {total_count} stale migration record{'s' if total_count != 1 else ''}.",
 836        fg="green",
 837    )
 838
 839
 840@cli.command("squash")
 841@click.argument("package_label")
 842@click.argument("start_migration_name", required=False)
 843@click.argument("migration_name")
 844@click.option(
 845    "--no-optimize",
 846    is_flag=True,
 847    help="Do not try to optimize the squashed operations.",
 848)
 849@click.option(
 850    "--noinput",
 851    "--no-input",
 852    "no_input",
 853    is_flag=True,
 854    help="Tells Plain to NOT prompt the user for input of any kind.",
 855)
 856@click.option("--squashed-name", help="Sets the name of the new squashed migration.")
 857@click.option(
 858    "-v",
 859    "--verbosity",
 860    type=int,
 861    default=1,
 862    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 863)
 864def squash(
 865    package_label: str,
 866    start_migration_name: str | None,
 867    migration_name: str,
 868    no_optimize: bool,
 869    no_input: bool,
 870    squashed_name: str | None,
 871    verbosity: int,
 872) -> None:
 873    """Squash multiple migrations into one"""
 874    interactive = not no_input
 875
 876    def find_migration(
 877        loader: MigrationLoader, package_label: str, name: str
 878    ) -> Migration:
 879        try:
 880            return loader.get_migration_by_prefix(package_label, name)
 881        except AmbiguityError:
 882            raise click.ClickException(
 883                f"More than one migration matches '{name}' in package '{package_label}'. Please be more specific."
 884            )
 885        except KeyError:
 886            raise click.ClickException(
 887                f"Cannot find a migration matching '{name}' from package '{package_label}'."
 888            )
 889
 890    # Validate package_label
 891    try:
 892        packages_registry.get_package_config(package_label)
 893    except LookupError as err:
 894        raise click.ClickException(str(err))
 895
 896    # Load the current graph state, check the app and migration they asked for exists
 897    loader = MigrationLoader(db_connection)
 898    if package_label not in loader.migrated_packages:
 899        raise click.ClickException(
 900            f"Package '{package_label}' does not have migrations (so squashmigrations on it makes no sense)"
 901        )
 902
 903    migration = find_migration(loader, package_label, migration_name)
 904
 905    # Work out the list of predecessor migrations
 906    migrations_to_squash = [
 907        loader.get_migration(al, mn)
 908        for al, mn in loader.graph.forwards_plan(
 909            (migration.package_label, migration.name)
 910        )
 911        if al == migration.package_label
 912    ]
 913
 914    if start_migration_name:
 915        start_migration = find_migration(loader, package_label, start_migration_name)
 916        start = loader.get_migration(
 917            start_migration.package_label, start_migration.name
 918        )
 919        try:
 920            start_index = migrations_to_squash.index(start)
 921            migrations_to_squash = migrations_to_squash[start_index:]
 922        except ValueError:
 923            raise click.ClickException(
 924                f"The migration '{start_migration}' cannot be found. Maybe it comes after "
 925                f"the migration '{migration}'?\n"
 926                f"Have a look at:\n"
 927                f"  plain migrations list {package_label}\n"
 928                f"to debug this issue."
 929            )
 930
 931    # Tell them what we're doing and optionally ask if we should proceed
 932    if verbosity > 0 or interactive:
 933        click.secho("Will squash the following migrations:", fg="cyan", bold=True)
 934        for migration in migrations_to_squash:
 935            click.echo(f" - {migration.name}")
 936
 937        if interactive:
 938            if not click.confirm("Do you wish to proceed?"):
 939                return
 940
 941    # Load the operations from all those migrations and concat together,
 942    # along with collecting external dependencies and detecting double-squashing
 943    operations = []
 944    dependencies = set()
 945    # We need to take all dependencies from the first migration in the list
 946    # as it may be 0002 depending on 0001
 947    first_migration = True
 948    for smigration in migrations_to_squash:
 949        if smigration.replaces:
 950            raise click.ClickException(
 951                "You cannot squash squashed migrations! Please transition it to a "
 952                "normal migration first"
 953            )
 954        operations.extend(smigration.operations)
 955        for dependency in smigration.dependencies:
 956            if isinstance(dependency, SettingsTuple):
 957                dependencies.add(dependency)
 958            elif dependency[0] != smigration.package_label or first_migration:
 959                dependencies.add(dependency)
 960        first_migration = False
 961
 962    if no_optimize:
 963        if verbosity > 0:
 964            click.secho("(Skipping optimization.)", fg="yellow")
 965        new_operations = operations
 966    else:
 967        if verbosity > 0:
 968            click.secho("Optimizing...", fg="cyan")
 969
 970        optimizer = MigrationOptimizer()
 971        new_operations = optimizer.optimize(operations, migration.package_label)
 972
 973        if verbosity > 0:
 974            if len(new_operations) == len(operations):
 975                click.echo("  No optimizations possible.")
 976            else:
 977                click.echo(
 978                    f"  Optimized from {len(operations)} operations to {len(new_operations)} operations."
 979                )
 980
 981    # Work out the value of replaces (any squashed ones we're re-squashing)
 982    # need to feed their replaces into ours
 983    replaces = []
 984    for migration in migrations_to_squash:
 985        if migration.replaces:
 986            replaces.extend(migration.replaces)
 987        else:
 988            replaces.append((migration.package_label, migration.name))
 989
 990    # Make a new migration with those operations
 991    subclass = type(
 992        "Migration",
 993        (migrations.Migration,),
 994        {
 995            "dependencies": dependencies,
 996            "operations": new_operations,
 997            "replaces": replaces,
 998        },
 999    )
1000    if start_migration_name:
1001        if squashed_name:
1002            # Use the name from --squashed-name
1003            prefix, _ = start_migration.name.split("_", 1)
1004            name = f"{prefix}_{squashed_name}"
1005        else:
1006            # Generate a name
1007            name = f"{start_migration.name}_squashed_{migration.name}"
1008        new_migration = subclass(name, package_label)
1009    else:
1010        name = f"0001_{'squashed_' + migration.name if not squashed_name else squashed_name}"
1011        new_migration = subclass(name, package_label)
1012        new_migration.initial = True
1013
1014    # Write out the new migration file
1015    writer = MigrationWriter(new_migration)
1016    if os.path.exists(writer.path):
1017        raise click.ClickException(
1018            f"Migration {new_migration.name} already exists. Use a different name."
1019        )
1020    with open(writer.path, "w", encoding="utf-8") as fh:
1021        fh.write(writer.as_string())
1022
1023    if verbosity > 0:
1024        click.secho(
1025            f"Created new squashed migration {writer.path}", fg="green", bold=True
1026        )
1027        click.echo(
1028            "  You should commit this migration but leave the old ones in place;\n"
1029            "  the new migration will be used for new installs. Once you are sure\n"
1030            "  all instances of the codebase have applied the migrations you squashed,\n"
1031            "  you can delete them."
1032        )
1033        if writer.needs_manual_porting:
1034            click.secho("Manual porting required", fg="yellow", bold=True)
1035            click.echo(
1036                "  Your migrations contained functions that must be manually copied over,\n"
1037                "  as we could not safely copy their implementation.\n"
1038                "  See the comment at the top of the squashed migration for details."
1039            )