Plain is headed towards 1.0! Subscribe for development updates →

   1from __future__ import annotations
   2
   3import os
   4import subprocess
   5import sys
   6import time
   7from typing import TYPE_CHECKING, Any, cast
   8
   9import click
  10
  11from plain.cli import register_cli
  12from plain.packages import packages_registry
  13from plain.runtime import settings
  14from plain.utils.text import Truncator
  15
  16from . import migrations
  17from .backups.cli import cli as backups_cli
  18from .backups.core import DatabaseBackups
  19from .db import OperationalError
  20from .db import db_connection as _db_connection
  21from .migrations.autodetector import MigrationAutodetector
  22from .migrations.executor import MigrationExecutor
  23from .migrations.loader import AmbiguityError, MigrationLoader
  24from .migrations.migration import Migration, SettingsTuple
  25from .migrations.optimizer import MigrationOptimizer
  26from .migrations.questioner import (
  27    InteractiveMigrationQuestioner,
  28    NonInteractiveMigrationQuestioner,
  29)
  30from .migrations.recorder import MigrationRecorder
  31from .migrations.state import ModelState, ProjectState
  32from .migrations.writer import MigrationWriter
  33from .registry import models_registry
  34
  35if TYPE_CHECKING:
  36    from .backends.base.base import BaseDatabaseWrapper
  37    from .migrations.operations.base import Operation
  38
  39    db_connection = cast("BaseDatabaseWrapper", _db_connection)
  40else:
  41    db_connection = _db_connection
  42
  43
  44@register_cli("models")
  45@click.group()
  46def cli() -> None:
  47    pass
  48
  49
  50cli.add_command(backups_cli)
  51
  52
  53@cli.command()
  54@click.argument("parameters", nargs=-1)
  55def db_shell(parameters: tuple[str, ...]) -> None:
  56    """Runs the command-line client for specified database, or the default database if none is provided."""
  57    try:
  58        db_connection.client.runshell(list(parameters))
  59    except FileNotFoundError:
  60        # Note that we're assuming the FileNotFoundError relates to the
  61        # command missing. It could be raised for some other reason, in
  62        # which case this error message would be inaccurate. Still, this
  63        # message catches the common case.
  64        click.secho(
  65            f"You appear not to have the {db_connection.client.executable_name!r} program installed or on your path.",
  66            fg="red",
  67            err=True,
  68        )
  69        sys.exit(1)
  70    except subprocess.CalledProcessError as e:
  71        click.secho(
  72            '"{}" returned non-zero exit status {}.'.format(
  73                " ".join(e.cmd),
  74                e.returncode,
  75            ),
  76            fg="red",
  77            err=True,
  78        )
  79        sys.exit(e.returncode)
  80
  81
  82@cli.command()
  83def db_wait() -> None:
  84    """Wait for the database to be ready"""
  85    attempts = 0
  86    while True:
  87        attempts += 1
  88        waiting_for = False
  89
  90        try:
  91            db_connection.ensure_connection()
  92        except OperationalError:
  93            waiting_for = True
  94
  95        if waiting_for:
  96            if attempts > 1:
  97                # After the first attempt, start printing them
  98                click.secho(
  99                    f"Waiting for database (attempt {attempts})",
 100                    fg="yellow",
 101                )
 102            time.sleep(1.5)
 103        else:
 104            click.secho("✔ Database ready", fg="green")
 105            break
 106
 107
 108@cli.command(name="list")
 109@click.argument("package_labels", nargs=-1)
 110@click.option(
 111    "--app-only",
 112    is_flag=True,
 113    help="Only show models from packages that start with 'app'.",
 114)
 115def list_models(package_labels: tuple[str, ...], app_only: bool) -> None:
 116    """List installed models."""
 117
 118    packages = set(package_labels)
 119
 120    for model in sorted(
 121        models_registry.get_models(),
 122        key=lambda m: (m.model_options.package_label, m.model_options.model_name),
 123    ):
 124        pkg = model.model_options.package_label
 125        pkg_name = packages_registry.get_package_config(pkg).name
 126        if app_only and not pkg_name.startswith("app"):
 127            continue
 128        if packages and pkg not in packages:
 129            continue
 130        fields = ", ".join(f.name for f in model._model_meta.get_fields())
 131        click.echo(
 132            f"{click.style(pkg, fg='cyan')}.{click.style(model.__name__, fg='blue')}"
 133        )
 134        click.echo(f"  table: {model.model_options.db_table}")
 135        click.echo(f"  fields: {fields}")
 136        click.echo(f"  package: {pkg_name}\n")
 137
 138
 139@register_cli("makemigrations")
 140@cli.command()
 141@click.argument("package_labels", nargs=-1)
 142@click.option(
 143    "--dry-run",
 144    is_flag=True,
 145    help="Just show what migrations would be made; don't actually write them.",
 146)
 147@click.option("--empty", is_flag=True, help="Create an empty migration.")
 148@click.option(
 149    "--noinput",
 150    "--no-input",
 151    "no_input",
 152    is_flag=True,
 153    help="Tells Plain to NOT prompt the user for input of any kind.",
 154)
 155@click.option("-n", "--name", help="Use this name for migration file(s).")
 156@click.option(
 157    "--check",
 158    is_flag=True,
 159    help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.",
 160)
 161@click.option(
 162    "-v",
 163    "--verbosity",
 164    type=int,
 165    default=1,
 166    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 167)
 168def makemigrations(
 169    package_labels: tuple[str, ...],
 170    dry_run: bool,
 171    empty: bool,
 172    no_input: bool,
 173    name: str | None,
 174    check: bool,
 175    verbosity: int,
 176) -> None:
 177    """Creates new migration(s) for packages."""
 178
 179    written_files: list[str] = []
 180    interactive = not no_input
 181    migration_name = name
 182    check_changes = check
 183
 184    def log(msg: str, level: int = 1) -> None:
 185        if verbosity >= level:
 186            click.echo(msg)
 187
 188    def write_migration_files(
 189        changes: dict[str, list[Migration]],
 190        update_previous_migration_paths: dict[str, str] | None = None,
 191    ) -> None:
 192        """Take a changes dict and write them out as migration files."""
 193        directory_created = {}
 194        for package_label, package_migrations in changes.items():
 195            log(
 196                click.style(f"Migrations for '{package_label}':", fg="cyan", bold=True),
 197                level=1,
 198            )
 199            for migration in package_migrations:
 200                writer = MigrationWriter(migration)
 201                migration_string = os.path.relpath(writer.path)
 202                log(f"  {click.style(migration_string, fg='yellow')}\n", level=1)
 203                for operation in migration.operations:
 204                    log(f"    - {operation.describe()}", level=1)
 205
 206                if not dry_run:
 207                    migrations_directory = os.path.dirname(writer.path)
 208                    if not directory_created.get(package_label):
 209                        os.makedirs(migrations_directory, exist_ok=True)
 210                        init_path = os.path.join(migrations_directory, "__init__.py")
 211                        if not os.path.isfile(init_path):
 212                            open(init_path, "w").close()
 213                        directory_created[package_label] = True
 214
 215                    migration_string = writer.as_string()
 216                    with open(writer.path, "w", encoding="utf-8") as fh:
 217                        fh.write(migration_string)
 218                        written_files.append(writer.path)
 219
 220                    if update_previous_migration_paths:
 221                        prev_path = update_previous_migration_paths[package_label]
 222                        if writer.needs_manual_porting:
 223                            log(
 224                                click.style(
 225                                    f"Updated migration {migration_string} requires manual porting.\n"
 226                                    f"Previous migration {os.path.relpath(prev_path)} was kept and "
 227                                    f"must be deleted after porting functions manually.",
 228                                    fg="yellow",
 229                                ),
 230                                level=1,
 231                            )
 232                        else:
 233                            os.remove(prev_path)
 234                            log(f"Deleted {os.path.relpath(prev_path)}", level=1)
 235                elif verbosity >= 3:
 236                    log(
 237                        click.style(
 238                            f"Full migrations file '{writer.filename}':",
 239                            fg="cyan",
 240                            bold=True,
 241                        ),
 242                        level=3,
 243                    )
 244                    log(writer.as_string(), level=3)
 245
 246    # Validate package labels
 247    package_labels_set = set(package_labels)
 248    has_bad_labels = False
 249    for package_label in package_labels_set:
 250        try:
 251            packages_registry.get_package_config(package_label)
 252        except LookupError as err:
 253            click.echo(str(err), err=True)
 254            has_bad_labels = True
 255    if has_bad_labels:
 256        sys.exit(2)
 257
 258    # Load the current graph state
 259    loader = MigrationLoader(None, ignore_no_migrations=True)
 260
 261    # Raise an error if any migrations are applied before their dependencies.
 262    # Only the default db_connection is supported.
 263    loader.check_consistent_history(db_connection)
 264
 265    # Check for conflicts
 266    conflicts = loader.detect_conflicts()
 267    if package_labels_set:
 268        conflicts = {
 269            package_label: conflict
 270            for package_label, conflict in conflicts.items()
 271            if package_label in package_labels_set
 272        }
 273
 274    if conflicts:
 275        name_str = "; ".join(
 276            "{} in {}".format(", ".join(names), package)
 277            for package, names in conflicts.items()
 278        )
 279        raise click.ClickException(
 280            f"Conflicting migrations detected; multiple leaf nodes in the "
 281            f"migration graph: ({name_str})."
 282        )
 283
 284    # Set up questioner
 285    if interactive:
 286        questioner = InteractiveMigrationQuestioner(
 287            specified_packages=package_labels_set,
 288            dry_run=dry_run,
 289        )
 290    else:
 291        questioner = NonInteractiveMigrationQuestioner(
 292            specified_packages=package_labels_set,
 293            dry_run=dry_run,
 294            verbosity=verbosity,
 295        )
 296
 297    # Set up autodetector
 298    autodetector = MigrationAutodetector(
 299        loader.project_state(),
 300        ProjectState.from_models_registry(models_registry),
 301        questioner,
 302    )
 303
 304    # Handle empty migrations if requested
 305    if empty:
 306        if not package_labels_set:
 307            raise click.ClickException(
 308                "You must supply at least one package label when using --empty."
 309            )
 310        changes = {
 311            package: [Migration("custom", package)] for package in package_labels_set
 312        }
 313        changes = autodetector.arrange_for_graph(
 314            changes=changes,
 315            graph=loader.graph,
 316            migration_name=migration_name,
 317        )
 318        write_migration_files(changes)
 319        return
 320
 321    # Detect changes
 322    changes = autodetector.changes(
 323        graph=loader.graph,
 324        trim_to_packages=package_labels_set or None,
 325        convert_packages=package_labels_set or None,
 326        migration_name=migration_name,
 327    )
 328
 329    if not changes:
 330        log(
 331            "No changes detected"
 332            if not package_labels_set
 333            else f"No changes detected in {'package' if len(package_labels_set) == 1 else 'packages'} "
 334            f"'{', '.join(package_labels_set)}'",
 335            level=1,
 336        )
 337    else:
 338        if check_changes:
 339            sys.exit(1)
 340
 341        write_migration_files(changes)
 342
 343
 344@register_cli("migrate")
 345@cli.command()
 346@click.argument("package_label", required=False)
 347@click.argument("migration_name", required=False)
 348@click.option(
 349    "--fake", is_flag=True, help="Mark migrations as run without actually running them."
 350)
 351@click.option(
 352    "--plan",
 353    is_flag=True,
 354    help="Shows a list of the migration actions that will be performed.",
 355)
 356@click.option(
 357    "--check",
 358    "check_unapplied",
 359    is_flag=True,
 360    help="Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.",
 361)
 362@click.option(
 363    "--backup/--no-backup",
 364    "backup",
 365    is_flag=True,
 366    default=None,
 367    help="Explicitly enable/disable pre-migration backups.",
 368)
 369@click.option(
 370    "--no-input",
 371    "--noinput",
 372    "no_input",
 373    is_flag=True,
 374    help="Tells Plain to NOT prompt the user for input of any kind.",
 375)
 376@click.option(
 377    "--atomic-batch/--no-atomic-batch",
 378    default=None,
 379    help="Run migrations in a single transaction (auto-detected by default)",
 380)
 381@click.option(
 382    "--quiet",
 383    is_flag=True,
 384    help="Suppress migration output (used for test database creation).",
 385)
 386def migrate(
 387    package_label: str | None,
 388    migration_name: str | None,
 389    fake: bool,
 390    plan: bool,
 391    check_unapplied: bool,
 392    backup: bool | None,
 393    no_input: bool,
 394    atomic_batch: bool | None,
 395    quiet: bool,
 396) -> None:
 397    """Updates database schema. Manages both packages with migrations and those without."""
 398
 399    def migration_progress_callback(
 400        action: str,
 401        *,
 402        migration: Migration | None = None,
 403        fake: bool = False,
 404        operation: Operation | None = None,
 405        sql_statements: list[str] | None = None,
 406    ) -> None:
 407        if quiet:
 408            return
 409
 410        if action == "apply_start":
 411            click.echo()  # Always add newline between migrations
 412            if fake:
 413                click.secho(f"{migration} (faked)", fg="cyan")
 414            else:
 415                click.secho(f"{migration}", fg="cyan")
 416        elif action == "apply_success":
 417            pass  # Already shown via operations
 418        elif action == "operation_start":
 419            click.echo(f"  {operation.describe()}", nl=False)
 420            click.secho("... ", dim=True, nl=False)
 421        elif action == "operation_success":
 422            # Show SQL statements (no OK needed, SQL implies success)
 423            if sql_statements:
 424                click.echo()  # newline after "..."
 425                for sql in sql_statements:
 426                    click.secho(f"    {sql}", dim=True)
 427            else:
 428                # No SQL: just add a newline
 429                click.echo()
 430
 431    def describe_operation(operation: Any) -> tuple[str, bool]:
 432        """Return a string that describes a migration operation for --plan."""
 433        prefix = ""
 434        is_error = False
 435        if hasattr(operation, "code"):
 436            code = operation.code
 437            action = (code.__doc__ or "") if code else None
 438        elif hasattr(operation, "sql"):
 439            action = operation.sql
 440        else:
 441            action = ""
 442        if action is not None:
 443            action = str(action).replace("\n", "")
 444        if action:
 445            action = " -> " + action
 446        truncated = Truncator(action)
 447        return prefix + operation.describe() + truncated.chars(40), is_error
 448
 449    # Get the database we're operating from
 450    # Hook for backends needing any database preparation
 451    db_connection.prepare_database()
 452
 453    # Work out which packages have migrations and which do not
 454    executor = MigrationExecutor(db_connection, migration_progress_callback)
 455
 456    # Raise an error if any migrations are applied before their dependencies.
 457    executor.loader.check_consistent_history(db_connection)
 458
 459    # Before anything else, see if there's conflicting packages and drop out
 460    # hard if there are any
 461    conflicts = executor.loader.detect_conflicts()
 462    if conflicts:
 463        name_str = "; ".join(
 464            "{} in {}".format(", ".join(names), package)
 465            for package, names in conflicts.items()
 466        )
 467        raise click.ClickException(
 468            "Conflicting migrations detected; multiple leaf nodes in the "
 469            f"migration graph: ({name_str})."
 470        )
 471
 472    # If they supplied command line arguments, work out what they mean.
 473    target_package_labels_only = True
 474    targets: list[tuple[str, str]]
 475    if package_label:
 476        try:
 477            packages_registry.get_package_config(package_label)
 478        except LookupError as err:
 479            raise click.ClickException(str(err))
 480
 481        if package_label not in executor.loader.migrated_packages:
 482            raise click.ClickException(
 483                f"Package '{package_label}' does not have migrations."
 484            )
 485
 486    if package_label and migration_name:
 487        try:
 488            migration = executor.loader.get_migration_by_prefix(
 489                package_label, migration_name
 490            )
 491        except AmbiguityError:
 492            raise click.ClickException(
 493                f"More than one migration matches '{migration_name}' in package '{package_label}'. "
 494                "Please be more specific."
 495            )
 496        except KeyError:
 497            raise click.ClickException(
 498                f"Cannot find a migration matching '{migration_name}' from package '{package_label}'."
 499            )
 500        target: tuple[str, str] = (package_label, migration.name)
 501        if (
 502            target not in executor.loader.graph.nodes
 503            and target in executor.loader.replacements
 504        ):
 505            incomplete_migration = executor.loader.replacements[target]
 506            target = incomplete_migration.replaces[-1]  # type: ignore[assignment]
 507        targets = [target]
 508        target_package_labels_only = False
 509    elif package_label:
 510        targets = [
 511            key for key in executor.loader.graph.leaf_nodes() if key[0] == package_label
 512        ]
 513    else:
 514        targets = list(executor.loader.graph.leaf_nodes())
 515
 516    migration_plan = executor.migration_plan(targets)
 517
 518    if plan:
 519        if not quiet:
 520            click.secho("Planned operations:", fg="cyan")
 521            if not migration_plan:
 522                click.echo("  No planned migration operations.")
 523            else:
 524                for migration in migration_plan:
 525                    click.secho(str(migration), fg="cyan")
 526                    for operation in migration.operations:
 527                        message, is_error = describe_operation(operation)
 528                        if is_error:
 529                            click.secho("    " + message, fg="yellow")
 530                        else:
 531                            click.echo("    " + message)
 532        if check_unapplied:
 533            sys.exit(1)
 534        return
 535
 536    if check_unapplied:
 537        if migration_plan:
 538            sys.exit(1)
 539        return
 540
 541    # Print some useful info
 542    if not quiet:
 543        if target_package_labels_only:
 544            packages = ", ".join(sorted({a for a, n in targets})) or "(none)"
 545            click.secho("Packages: ", bold=True, nl=False)
 546            click.secho(packages, dim=True)
 547            click.echo()  # Add newline after packages
 548        else:
 549            click.secho("Target: ", bold=True, nl=False)
 550            click.secho(f"{targets[0][1]} from {targets[0][0]}", dim=True)
 551            click.echo()  # Add newline after target
 552
 553    pre_migrate_state = executor._create_project_state(with_applied_migrations=True)
 554
 555    if migration_plan:
 556        # Determine whether to use atomic batch
 557        use_atomic_batch = False
 558        atomic_batch_message = None
 559        if len(migration_plan) > 1:
 560            # Check database capabilities
 561            can_rollback_ddl = db_connection.features.can_rollback_ddl
 562
 563            # Check if all migrations support atomic
 564            non_atomic_migrations = [m for m in migration_plan if not m.atomic]
 565
 566            if atomic_batch is True:
 567                # User explicitly requested atomic batch
 568                if not can_rollback_ddl:
 569                    raise click.UsageError(
 570                        f"--atomic-batch not supported on {db_connection.vendor}. "
 571                        "Remove the flag or use a database that supports transactional DDL."
 572                    )
 573                if non_atomic_migrations:
 574                    names = ", ".join(
 575                        f"{m.package_label}.{m.name}" for m in non_atomic_migrations[:3]
 576                    )
 577                    if len(non_atomic_migrations) > 3:
 578                        names += f", and {len(non_atomic_migrations) - 3} more"
 579                    raise click.UsageError(
 580                        f"--atomic-batch requested but these migrations have atomic=False: {names}"
 581                    )
 582                use_atomic_batch = True
 583                atomic_batch_message = (
 584                    f"Running {len(migration_plan)} migrations in atomic batch"
 585                )
 586            elif atomic_batch is False:
 587                # User explicitly disabled atomic batch
 588                use_atomic_batch = False
 589                if len(migration_plan) > 1:
 590                    atomic_batch_message = (
 591                        f"Running {len(migration_plan)} migrations separately"
 592                    )
 593            else:
 594                # Auto-detect (atomic_batch is None)
 595                if can_rollback_ddl and not non_atomic_migrations:
 596                    use_atomic_batch = True
 597                    atomic_batch_message = (
 598                        f"Running {len(migration_plan)} migrations in atomic batch"
 599                    )
 600                else:
 601                    use_atomic_batch = False
 602                    if len(migration_plan) > 1:
 603                        if not can_rollback_ddl:
 604                            atomic_batch_message = f"Running {len(migration_plan)} migrations separately ({db_connection.vendor} doesn't support batch)"
 605                        elif non_atomic_migrations:
 606                            atomic_batch_message = f"Running {len(migration_plan)} migrations separately (some have atomic=False)"
 607                        else:
 608                            atomic_batch_message = (
 609                                f"Running {len(migration_plan)} migrations separately"
 610                            )
 611
 612        if backup or (backup is None and settings.DEBUG):
 613            backup_name = f"migrate_{time.strftime('%Y%m%d_%H%M%S')}"
 614            if not quiet:
 615                click.secho("Creating backup: ", bold=True, nl=False)
 616                click.secho(f"{backup_name}", dim=True, nl=False)
 617                click.secho("... ", dim=True, nl=False)
 618
 619            backups_handler = DatabaseBackups()
 620            backups_handler.create(
 621                backup_name,
 622                pg_dump=os.environ.get("PG_DUMP", "pg_dump"),
 623            )
 624
 625            if not quiet:
 626                click.echo(click.style("OK", fg="green"))
 627                click.echo()  # Add blank line after backup output
 628        else:
 629            if not quiet:
 630                click.echo()  # Add blank line after packages/target info
 631
 632        if not quiet:
 633            if atomic_batch_message:
 634                click.secho(
 635                    f"Applying migrations ({atomic_batch_message.lower()}):", bold=True
 636                )
 637            else:
 638                click.secho("Applying migrations:", bold=True)
 639        post_migrate_state = executor.migrate(
 640            targets,
 641            plan=migration_plan,
 642            state=pre_migrate_state.clone(),
 643            fake=fake,
 644            atomic_batch=use_atomic_batch,
 645        )
 646        # post_migrate signals have access to all models. Ensure that all models
 647        # are reloaded in case any are delayed.
 648        post_migrate_state.clear_delayed_models_cache()
 649        post_migrate_packages = post_migrate_state.models_registry
 650
 651        # Re-render models of real packages to include relationships now that
 652        # we've got a final state. This wouldn't be necessary if real packages
 653        # models were rendered with relationships in the first place.
 654        with post_migrate_packages.bulk_update():
 655            model_keys = []
 656            for model_state in post_migrate_packages.real_models:
 657                model_key = model_state.package_label, model_state.name_lower
 658                model_keys.append(model_key)
 659                post_migrate_packages.unregister_model(*model_key)
 660        post_migrate_packages.render_multiple(
 661            [
 662                ModelState.from_model(models_registry.get_model(*model))
 663                for model in model_keys
 664            ]
 665        )
 666
 667    else:
 668        if not quiet:
 669            click.echo("No migrations to apply.")
 670            # If there's changes that aren't in migrations yet, tell them
 671            # how to fix it.
 672            autodetector = MigrationAutodetector(
 673                executor.loader.project_state(),
 674                ProjectState.from_models_registry(models_registry),
 675            )
 676            changes = autodetector.changes(graph=executor.loader.graph)
 677            if changes:
 678                packages = ", ".join(sorted(changes))
 679                click.echo(
 680                    f"Your models have changes that are not yet reflected in migrations ({packages})."
 681                )
 682                click.echo(
 683                    "Run 'plain makemigrations' to create migrations for these changes."
 684                )
 685
 686
 687@cli.command()
 688@click.argument("package_labels", nargs=-1)
 689@click.option(
 690    "--format",
 691    type=click.Choice(["list", "plan"]),
 692    default="list",
 693    help="Output format.",
 694)
 695@click.option(
 696    "-v",
 697    "--verbosity",
 698    type=int,
 699    default=1,
 700    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 701)
 702def show_migrations(
 703    package_labels: tuple[str, ...], format: str, verbosity: int
 704) -> None:
 705    """Shows all available migrations for the current project"""
 706
 707    def _validate_package_names(package_names: tuple[str, ...]) -> None:
 708        has_bad_names = False
 709        for package_name in package_names:
 710            try:
 711                packages_registry.get_package_config(package_name)
 712            except LookupError as err:
 713                click.echo(str(err), err=True)
 714                has_bad_names = True
 715        if has_bad_names:
 716            sys.exit(2)
 717
 718    def show_list(db_connection: Any, package_names: tuple[str, ...]) -> None:
 719        """
 720        Show a list of all migrations on the system, or only those of
 721        some named packages.
 722        """
 723        # Load migrations from disk/DB
 724        loader = MigrationLoader(db_connection, ignore_no_migrations=True)
 725        recorder = MigrationRecorder(db_connection)
 726        recorded_migrations = recorder.applied_migrations()
 727
 728        graph = loader.graph
 729        # If we were passed a list of packages, validate it
 730        package_names_list: list[str]
 731        if package_names:
 732            _validate_package_names(package_names)
 733            package_names_list = list(package_names)
 734        # Otherwise, show all packages in alphabetic order
 735        else:
 736            package_names_list = sorted(loader.migrated_packages)
 737        # For each app, print its migrations in order from oldest (roots) to
 738        # newest (leaves).
 739        for package_name in package_names_list:
 740            click.secho(package_name, fg="cyan", bold=True)
 741            shown = set()
 742            for node in graph.leaf_nodes(package_name):
 743                for plan_node in graph.forwards_plan(node):
 744                    if plan_node not in shown and plan_node[0] == package_name:
 745                        # Give it a nice title if it's a squashed one
 746                        title = plan_node[1]
 747                        if graph.nodes[plan_node].replaces:  # type: ignore[union-attr]
 748                            title += f" ({len(graph.nodes[plan_node].replaces)} squashed migrations)"  # type: ignore[union-attr]
 749                        applied_migration = loader.applied_migrations.get(plan_node)  # type: ignore[union-attr]
 750                        # Mark it as applied/unapplied
 751                        if applied_migration:
 752                            if plan_node in recorded_migrations:
 753                                output = f" [X] {title}"
 754                            else:
 755                                title += " Run `plain migrate` to finish recording."
 756                                output = f" [-] {title}"
 757                            if verbosity >= 2 and hasattr(applied_migration, "applied"):
 758                                output += f" (applied at {applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')})"
 759                            click.echo(output)
 760                        else:
 761                            click.echo(f" [ ] {title}")
 762                        shown.add(plan_node)
 763            # If we didn't print anything, then a small message
 764            if not shown:
 765                click.secho(" (no migrations)", fg="red")
 766
 767    def show_plan(db_connection: Any, package_names: tuple[str, ...]) -> None:
 768        """
 769        Show all known migrations (or only those of the specified package_names)
 770        in the order they will be applied.
 771        """
 772        # Load migrations from disk/DB
 773        loader = MigrationLoader(db_connection)
 774        graph = loader.graph
 775        if package_names:
 776            _validate_package_names(package_names)
 777            targets = [key for key in graph.leaf_nodes() if key[0] in package_names]
 778        else:
 779            targets = graph.leaf_nodes()
 780        plan = []
 781        seen = set()
 782
 783        # Generate the plan
 784        for target in targets:
 785            for migration in graph.forwards_plan(target):
 786                if migration not in seen:
 787                    node = graph.node_map[migration]
 788                    plan.append(node)
 789                    seen.add(migration)
 790
 791        # Output
 792        def print_deps(node: Any) -> str:
 793            out = []
 794            for parent in sorted(node.parents):
 795                out.append(f"{parent.key[0]}.{parent.key[1]}")
 796            if out:
 797                return f" ... ({', '.join(out)})"
 798            return ""
 799
 800        for node in plan:
 801            deps = ""
 802            if verbosity >= 2:
 803                deps = print_deps(node)
 804            if node.key in loader.applied_migrations:  # type: ignore[operator]
 805                click.echo(f"[X]  {node.key[0]}.{node.key[1]}{deps}")
 806            else:
 807                click.echo(f"[ ]  {node.key[0]}.{node.key[1]}{deps}")
 808        if not plan:
 809            click.secho("(no migrations)", fg="red")
 810
 811    # Get the database we're operating from
 812
 813    if format == "plan":
 814        show_plan(db_connection, package_labels)
 815    else:
 816        show_list(db_connection, package_labels)
 817
 818
 819@cli.command()
 820@click.option(
 821    "--yes",
 822    is_flag=True,
 823    help="Skip confirmation prompt (for non-interactive use).",
 824)
 825def prune_migrations(yes: bool) -> None:
 826    """Show and optionally remove stale migration records from the database."""
 827    # Load migrations from disk and database
 828    loader = MigrationLoader(db_connection, ignore_no_migrations=True)
 829    recorder = MigrationRecorder(db_connection)
 830    recorded_migrations = recorder.applied_migrations()
 831
 832    # Find all prunable migrations (recorded but not on disk)
 833    all_prunable = [
 834        migration
 835        for migration in recorded_migrations
 836        if migration not in loader.disk_migrations  # type: ignore[operator]
 837    ]
 838
 839    if not all_prunable:
 840        click.echo("No stale migration records found.")
 841        return
 842
 843    # Separate into existing packages vs orphaned packages
 844    existing_packages = set(loader.migrated_packages)
 845    prunable_existing: dict[str, list[str]] = {}
 846    prunable_orphaned: dict[str, list[str]] = {}
 847
 848    for migration in all_prunable:
 849        package, name = migration
 850        if package in existing_packages:
 851            if package not in prunable_existing:
 852                prunable_existing[package] = []
 853            prunable_existing[package].append(name)
 854        else:
 855            if package not in prunable_orphaned:
 856                prunable_orphaned[package] = []
 857            prunable_orphaned[package].append(name)
 858
 859    # Display what was found
 860    if prunable_existing:
 861        click.secho(
 862            "Stale migration records (from existing packages):",
 863            fg="yellow",
 864            bold=True,
 865        )
 866        for package in sorted(prunable_existing.keys()):
 867            click.secho(f"  {package}:", fg="yellow")
 868            for name in sorted(prunable_existing[package]):
 869                click.echo(f"    - {name}")
 870        click.echo()
 871
 872    if prunable_orphaned:
 873        click.secho(
 874            "Orphaned migration records (from removed packages):",
 875            fg="red",
 876            bold=True,
 877        )
 878        for package in sorted(prunable_orphaned.keys()):
 879            click.secho(f"  {package}:", fg="red")
 880            for name in sorted(prunable_orphaned[package]):
 881                click.echo(f"    - {name}")
 882        click.echo()
 883
 884    total_count = sum(len(migs) for migs in prunable_existing.values()) + sum(
 885        len(migs) for migs in prunable_orphaned.values()
 886    )
 887
 888    if not yes:
 889        click.echo(
 890            f"Found {total_count} stale migration record{'s' if total_count != 1 else ''}."
 891        )
 892        click.echo()
 893
 894        # Prompt for confirmation if interactive
 895        if not click.confirm(
 896            "Do you want to remove these migrations from the database?"
 897        ):
 898            return
 899
 900    # Actually prune the migrations
 901    click.secho("Pruning migrations...", bold=True)
 902
 903    for package, migration_names in prunable_existing.items():
 904        for name in sorted(migration_names):
 905            click.echo(f"  Pruning {package}.{name}...", nl=False)
 906            recorder.record_unapplied(package, name)
 907            click.echo(" OK")
 908
 909    for package, migration_names in prunable_orphaned.items():
 910        for name in sorted(migration_names):
 911            click.echo(f"  Pruning {package}.{name} (orphaned)...", nl=False)
 912            recorder.record_unapplied(package, name)
 913            click.echo(" OK")
 914
 915    click.secho(
 916        f"✓ Removed {total_count} stale migration record{'s' if total_count != 1 else ''}.",
 917        fg="green",
 918    )
 919
 920
 921@cli.command()
 922@click.argument("package_label")
 923@click.argument("start_migration_name", required=False)
 924@click.argument("migration_name")
 925@click.option(
 926    "--no-optimize",
 927    is_flag=True,
 928    help="Do not try to optimize the squashed operations.",
 929)
 930@click.option(
 931    "--noinput",
 932    "--no-input",
 933    "no_input",
 934    is_flag=True,
 935    help="Tells Plain to NOT prompt the user for input of any kind.",
 936)
 937@click.option("--squashed-name", help="Sets the name of the new squashed migration.")
 938@click.option(
 939    "-v",
 940    "--verbosity",
 941    type=int,
 942    default=1,
 943    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 944)
 945def squash_migrations(
 946    package_label: str,
 947    start_migration_name: str | None,
 948    migration_name: str,
 949    no_optimize: bool,
 950    no_input: bool,
 951    squashed_name: str | None,
 952    verbosity: int,
 953) -> None:
 954    """
 955    Squashes an existing set of migrations (from first until specified) into a single new one.
 956    """
 957    interactive = not no_input
 958
 959    def find_migration(
 960        loader: MigrationLoader, package_label: str, name: str
 961    ) -> Migration:
 962        try:
 963            return loader.get_migration_by_prefix(package_label, name)
 964        except AmbiguityError:
 965            raise click.ClickException(
 966                f"More than one migration matches '{name}' in package '{package_label}'. Please be more specific."
 967            )
 968        except KeyError:
 969            raise click.ClickException(
 970                f"Cannot find a migration matching '{name}' from package '{package_label}'."
 971            )
 972
 973    # Validate package_label
 974    try:
 975        packages_registry.get_package_config(package_label)
 976    except LookupError as err:
 977        raise click.ClickException(str(err))
 978
 979    # Load the current graph state, check the app and migration they asked for exists
 980    loader = MigrationLoader(db_connection)
 981    if package_label not in loader.migrated_packages:
 982        raise click.ClickException(
 983            f"Package '{package_label}' does not have migrations (so squashmigrations on it makes no sense)"
 984        )
 985
 986    migration = find_migration(loader, package_label, migration_name)
 987
 988    # Work out the list of predecessor migrations
 989    migrations_to_squash = [
 990        loader.get_migration(al, mn)
 991        for al, mn in loader.graph.forwards_plan(
 992            (migration.package_label, migration.name)
 993        )
 994        if al == migration.package_label
 995    ]
 996
 997    if start_migration_name:
 998        start_migration = find_migration(loader, package_label, start_migration_name)
 999        start = loader.get_migration(
1000            start_migration.package_label, start_migration.name
1001        )
1002        try:
1003            start_index = migrations_to_squash.index(start)
1004            migrations_to_squash = migrations_to_squash[start_index:]
1005        except ValueError:
1006            raise click.ClickException(
1007                f"The migration '{start_migration}' cannot be found. Maybe it comes after "
1008                f"the migration '{migration}'?\n"
1009                f"Have a look at:\n"
1010                f"  plain models show-migrations {package_label}\n"
1011                f"to debug this issue."
1012            )
1013
1014    # Tell them what we're doing and optionally ask if we should proceed
1015    if verbosity > 0 or interactive:
1016        click.secho("Will squash the following migrations:", fg="cyan", bold=True)
1017        for migration in migrations_to_squash:
1018            click.echo(f" - {migration.name}")
1019
1020        if interactive:
1021            if not click.confirm("Do you wish to proceed?"):
1022                return
1023
1024    # Load the operations from all those migrations and concat together,
1025    # along with collecting external dependencies and detecting double-squashing
1026    operations = []
1027    dependencies = set()
1028    # We need to take all dependencies from the first migration in the list
1029    # as it may be 0002 depending on 0001
1030    first_migration = True
1031    for smigration in migrations_to_squash:
1032        if smigration.replaces:
1033            raise click.ClickException(
1034                "You cannot squash squashed migrations! Please transition it to a "
1035                "normal migration first"
1036            )
1037        operations.extend(smigration.operations)
1038        for dependency in smigration.dependencies:
1039            if isinstance(dependency, SettingsTuple):
1040                dependencies.add(dependency)
1041            elif dependency[0] != smigration.package_label or first_migration:
1042                dependencies.add(dependency)
1043        first_migration = False
1044
1045    if no_optimize:
1046        if verbosity > 0:
1047            click.secho("(Skipping optimization.)", fg="yellow")
1048        new_operations = operations
1049    else:
1050        if verbosity > 0:
1051            click.secho("Optimizing...", fg="cyan")
1052
1053        optimizer = MigrationOptimizer()
1054        new_operations = optimizer.optimize(operations, migration.package_label)
1055
1056        if verbosity > 0:
1057            if len(new_operations) == len(operations):
1058                click.echo("  No optimizations possible.")
1059            else:
1060                click.echo(
1061                    f"  Optimized from {len(operations)} operations to {len(new_operations)} operations."
1062                )
1063
1064    # Work out the value of replaces (any squashed ones we're re-squashing)
1065    # need to feed their replaces into ours
1066    replaces = []
1067    for migration in migrations_to_squash:
1068        if migration.replaces:
1069            replaces.extend(migration.replaces)
1070        else:
1071            replaces.append((migration.package_label, migration.name))
1072
1073    # Make a new migration with those operations
1074    subclass = type(
1075        "Migration",
1076        (migrations.Migration,),
1077        {
1078            "dependencies": dependencies,
1079            "operations": new_operations,
1080            "replaces": replaces,
1081        },
1082    )
1083    if start_migration_name:
1084        if squashed_name:
1085            # Use the name from --squashed-name
1086            prefix, _ = start_migration.name.split("_", 1)
1087            name = f"{prefix}_{squashed_name}"
1088        else:
1089            # Generate a name
1090            name = f"{start_migration.name}_squashed_{migration.name}"
1091        new_migration = subclass(name, package_label)
1092    else:
1093        name = f"0001_{'squashed_' + migration.name if not squashed_name else squashed_name}"
1094        new_migration = subclass(name, package_label)
1095        new_migration.initial = True
1096
1097    # Write out the new migration file
1098    writer = MigrationWriter(new_migration)
1099    if os.path.exists(writer.path):
1100        raise click.ClickException(
1101            f"Migration {new_migration.name} already exists. Use a different name."
1102        )
1103    with open(writer.path, "w", encoding="utf-8") as fh:
1104        fh.write(writer.as_string())
1105
1106    if verbosity > 0:
1107        click.secho(
1108            f"Created new squashed migration {writer.path}", fg="green", bold=True
1109        )
1110        click.echo(
1111            "  You should commit this migration but leave the old ones in place;\n"
1112            "  the new migration will be used for new installs. Once you are sure\n"
1113            "  all instances of the codebase have applied the migrations you squashed,\n"
1114            "  you can delete them."
1115        )
1116        if writer.needs_manual_porting:
1117            click.secho("Manual porting required", fg="yellow", bold=True)
1118            click.echo(
1119                "  Your migrations contained functions that must be manually copied over,\n"
1120                "  as we could not safely copy their implementation.\n"
1121                "  See the comment at the top of the squashed migration for details."
1122            )