1from __future__ import annotations
   2
   3import os
   4import sys
   5import time
   6from typing import TYPE_CHECKING, Any
   7
   8import click
   9
  10from plain.cli import register_cli
  11from plain.cli.runtime import common_command
  12from plain.packages import packages_registry
  13from plain.runtime import settings
  14from plain.utils.text import Truncator
  15
  16from .. import migrations
  17from ..backups.core import DatabaseBackups
  18from ..db import get_connection
  19from ..migrations.autodetector import MigrationAutodetector
  20from ..migrations.executor import MigrationExecutor
  21from ..migrations.loader import AmbiguityError, MigrationLoader
  22from ..migrations.migration import Migration, SettingsTuple
  23from ..migrations.optimizer import MigrationOptimizer
  24from ..migrations.questioner import (
  25    InteractiveMigrationQuestioner,
  26    NonInteractiveMigrationQuestioner,
  27)
  28from ..migrations.recorder import MigrationRecorder
  29from ..migrations.state import ModelState, ProjectState
  30from ..migrations.writer import MigrationWriter
  31from ..registry import models_registry
  32
  33if TYPE_CHECKING:
  34    from ..connection import DatabaseConnection
  35    from ..migrations.operations.base import Operation
  36
  37
  38@register_cli("migrations")
  39@click.group()
  40def cli() -> None:
  41    """Database migration management"""
  42
  43
  44@common_command
  45@register_cli("makemigrations", shortcut_for="migrations make")
  46@cli.command("make")
  47@click.argument("package_labels", nargs=-1)
  48@click.option(
  49    "--dry-run",
  50    is_flag=True,
  51    help="Just show what migrations would be made; don't actually write them.",
  52)
  53@click.option("--empty", is_flag=True, help="Create an empty migration.")
  54@click.option(
  55    "--noinput",
  56    "--no-input",
  57    "no_input",
  58    is_flag=True,
  59    help="Tells Plain to NOT prompt the user for input of any kind.",
  60)
  61@click.option("-n", "--name", help="Use this name for migration file(s).")
  62@click.option(
  63    "--check",
  64    is_flag=True,
  65    help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.",
  66)
  67@click.option(
  68    "-v",
  69    "--verbosity",
  70    type=int,
  71    default=1,
  72    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
  73)
  74def make(
  75    package_labels: tuple[str, ...],
  76    dry_run: bool,
  77    empty: bool,
  78    no_input: bool,
  79    name: str | None,
  80    check: bool,
  81    verbosity: int,
  82) -> None:
  83    """Create new database migrations"""
  84
  85    written_files: list[str] = []
  86    interactive = not no_input
  87    migration_name = name
  88    check_changes = check
  89
  90    def log(msg: str, level: int = 1) -> None:
  91        if verbosity >= level:
  92            click.echo(msg)
  93
  94    def collect_sql_for_migration(
  95        migration: Migration,
  96        project_state: ProjectState,
  97    ) -> tuple[list[str], ProjectState]:
  98        """Apply a migration in collect mode and return the SQL statements."""
  99        with get_connection().schema_editor(collect_sql=True) as editor:
 100            new_state = migration.apply(project_state, editor)
 101            return list(editor.executed_sql), new_state
 102
 103    def write_migration_files(
 104        changes: dict[str, list[Migration]],
 105        update_previous_migration_paths: dict[str, str] | None = None,
 106    ) -> None:
 107        """Take a changes dict and write them out as migration files."""
 108        directory_created = {}
 109        # Track state for SQL collection in dry-run mode.
 110        sql_state = loader.project_state() if dry_run else None
 111        for package_label, package_migrations in changes.items():
 112            log(
 113                click.style(f"Migrations for '{package_label}':", fg="cyan", bold=True),
 114                level=1,
 115            )
 116            for migration in package_migrations:
 117                writer = MigrationWriter(migration)
 118                migration_string = os.path.relpath(writer.path)
 119                log(f"  {click.style(migration_string, fg='yellow')}\n", level=1)
 120                for operation in migration.operations:
 121                    log(f"    - {operation.describe()}", level=1)
 122
 123                if not dry_run:
 124                    migrations_directory = os.path.dirname(writer.path)
 125                    if not directory_created.get(package_label):
 126                        os.makedirs(migrations_directory, exist_ok=True)
 127                        init_path = os.path.join(migrations_directory, "__init__.py")
 128                        if not os.path.isfile(init_path):
 129                            open(init_path, "w").close()
 130                        directory_created[package_label] = True
 131
 132                    migration_string = writer.as_string()
 133                    with open(writer.path, "w", encoding="utf-8") as fh:
 134                        fh.write(migration_string)
 135                        written_files.append(writer.path)
 136
 137                    if update_previous_migration_paths:
 138                        prev_path = update_previous_migration_paths[package_label]
 139                        if writer.needs_manual_porting:
 140                            log(
 141                                click.style(
 142                                    f"Updated migration {migration_string} requires manual porting.\n"
 143                                    f"Previous migration {os.path.relpath(prev_path)} was kept and "
 144                                    f"must be deleted after porting functions manually.",
 145                                    fg="yellow",
 146                                ),
 147                                level=1,
 148                            )
 149                        else:
 150                            os.remove(prev_path)
 151                            log(f"Deleted {os.path.relpath(prev_path)}", level=1)
 152                else:
 153                    # dry_run is True — show SQL preview and optionally the full file.
 154                    assert sql_state is not None
 155                    sql_statements, sql_state = collect_sql_for_migration(
 156                        migration, sql_state
 157                    )
 158                    if sql_statements:
 159                        log("", level=1)
 160                        log(
 161                            click.style("    SQL:", fg="green", bold=True),
 162                            level=1,
 163                        )
 164                        for sql in sql_statements:
 165                            log(f"      {sql};", level=1)
 166
 167                    if verbosity >= 3:
 168                        log(
 169                            click.style(
 170                                f"\n    Full migrations file '{writer.filename}':",
 171                                fg="cyan",
 172                                bold=True,
 173                            ),
 174                            level=3,
 175                        )
 176                        log(writer.as_string(), level=3)
 177
 178    # Validate package labels
 179    package_labels_set = set(package_labels)
 180    has_bad_labels = False
 181    for package_label in package_labels_set:
 182        try:
 183            packages_registry.get_package_config(package_label)
 184        except LookupError as err:
 185            click.echo(str(err), err=True)
 186            has_bad_labels = True
 187    if has_bad_labels:
 188        sys.exit(2)
 189
 190    # Load the current graph state
 191    loader = MigrationLoader(None, ignore_no_migrations=True)
 192
 193    # Raise an error if any migrations are applied before their dependencies.
 194    loader.check_consistent_history(get_connection())
 195
 196    # Check for conflicts
 197    conflicts = loader.detect_conflicts()
 198    if package_labels_set:
 199        conflicts = {
 200            package_label: conflict
 201            for package_label, conflict in conflicts.items()
 202            if package_label in package_labels_set
 203        }
 204
 205    if conflicts:
 206        name_str = "; ".join(
 207            "{} in {}".format(", ".join(names), package)
 208            for package, names in conflicts.items()
 209        )
 210        raise click.ClickException(
 211            f"Conflicting migrations detected; multiple leaf nodes in the "
 212            f"migration graph: ({name_str})."
 213        )
 214
 215    # Set up questioner
 216    if interactive:
 217        questioner = InteractiveMigrationQuestioner(
 218            specified_packages=package_labels_set,
 219            dry_run=dry_run,
 220        )
 221    else:
 222        questioner = NonInteractiveMigrationQuestioner(
 223            specified_packages=package_labels_set,
 224            dry_run=dry_run,
 225            verbosity=verbosity,
 226        )
 227
 228    # Set up autodetector
 229    autodetector = MigrationAutodetector(
 230        loader.project_state(),
 231        ProjectState.from_models_registry(models_registry),
 232        questioner,
 233    )
 234
 235    # Handle empty migrations if requested
 236    if empty:
 237        if not package_labels_set:
 238            raise click.ClickException(
 239                "You must supply at least one package label when using --empty."
 240            )
 241        changes = {
 242            package: [Migration("custom", package)] for package in package_labels_set
 243        }
 244        changes = autodetector.arrange_for_graph(
 245            changes=changes,
 246            graph=loader.graph,
 247            migration_name=migration_name,
 248        )
 249        write_migration_files(changes)
 250        return
 251
 252    # Detect changes
 253    changes = autodetector.changes(
 254        graph=loader.graph,
 255        trim_to_packages=package_labels_set or None,
 256        convert_packages=package_labels_set or None,
 257        migration_name=migration_name,
 258    )
 259
 260    if not changes:
 261        log(
 262            "No changes detected"
 263            if not package_labels_set
 264            else f"No changes detected in {'package' if len(package_labels_set) == 1 else 'packages'} "
 265            f"'{', '.join(package_labels_set)}'",
 266            level=1,
 267        )
 268    else:
 269        if check_changes:
 270            sys.exit(1)
 271
 272        write_migration_files(changes)
 273
 274    # Warn about packages that have models but no migrations directory.
 275    # These are silently skipped by the autodetector, which can be confusing
 276    # when setting up a new app (makemigrations says "No changes detected").
 277    unmigrated_with_models = []
 278    for package_label in sorted(loader.unmigrated_packages):
 279        module_name, _explicit = MigrationLoader.migrations_module(package_label)
 280        # Skip packages that explicitly opt out of migrations (module_name is None).
 281        if module_name is not None and models_registry.all_models.get(package_label):
 282            unmigrated_with_models.append((package_label, module_name))
 283    if unmigrated_with_models:
 284        click.echo()
 285        click.echo(
 286            click.style(
 287                "Warning: The following packages have models but no migrations directory:",
 288                fg="yellow",
 289            )
 290        )
 291        for package_label, module_name in unmigrated_with_models:
 292            module_path = module_name.replace(".", "/")
 293            click.echo(
 294                f"  - {package_label} (create {module_path}/ to enable migrations)"
 295            )
 296        click.echo()
 297        click.echo(
 298            "To create initial migrations, add the directory and run "
 299            + click.style("plain makemigrations", bold=True)
 300            + " again."
 301        )
 302
 303
 304@common_command
 305@register_cli("migrate", shortcut_for="migrations apply")
 306@cli.command("apply")
 307@click.argument("package_label", required=False)
 308@click.argument("migration_name", required=False)
 309@click.option(
 310    "--fake", is_flag=True, help="Mark migrations as run without actually running them."
 311)
 312@click.option(
 313    "--plan",
 314    is_flag=True,
 315    help="Shows a list of the migration actions that will be performed.",
 316)
 317@click.option(
 318    "--check",
 319    "check_unapplied",
 320    is_flag=True,
 321    help="Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.",
 322)
 323@click.option(
 324    "--backup/--no-backup",
 325    "backup",
 326    is_flag=True,
 327    default=None,
 328    help="Explicitly enable/disable pre-migration backups.",
 329)
 330@click.option(
 331    "--no-input",
 332    "--noinput",
 333    "no_input",
 334    is_flag=True,
 335    help="Tells Plain to NOT prompt the user for input of any kind.",
 336)
 337@click.option(
 338    "--atomic-batch/--no-atomic-batch",
 339    default=None,
 340    help="Run migrations in a single transaction (auto-detected by default)",
 341)
 342@click.option(
 343    "--quiet",
 344    is_flag=True,
 345    help="Suppress migration output (used for test database creation).",
 346)
 347def apply(
 348    package_label: str | None,
 349    migration_name: str | None,
 350    fake: bool,
 351    plan: bool,
 352    check_unapplied: bool,
 353    backup: bool | None,
 354    no_input: bool,
 355    atomic_batch: bool | None,
 356    quiet: bool,
 357) -> None:
 358    """Apply database migrations"""
 359
 360    def migration_progress_callback(
 361        action: str,
 362        *,
 363        migration: Migration | None = None,
 364        fake: bool = False,
 365        operation: Operation | None = None,
 366        sql_statements: list[str] | None = None,
 367    ) -> None:
 368        if quiet:
 369            return
 370
 371        if action == "apply_start":
 372            click.echo()  # Always add newline between migrations
 373            if fake:
 374                click.secho(f"{migration} (faked)", fg="cyan")
 375            else:
 376                click.secho(f"{migration}", fg="cyan")
 377        elif action == "apply_success":
 378            pass  # Already shown via operations
 379        elif action == "operation_start":
 380            if operation is not None:
 381                click.echo(f"  {operation.describe()}", nl=False)
 382                click.secho("... ", dim=True, nl=False)
 383        elif action == "operation_success":
 384            # Show SQL statements (no OK needed, SQL implies success)
 385            if sql_statements:
 386                click.echo()  # newline after "..."
 387                for sql in sql_statements:
 388                    click.secho(f"    {sql}", dim=True)
 389            else:
 390                # No SQL: just add a newline
 391                click.echo()
 392
 393    def describe_operation(operation: Any) -> tuple[str, bool]:
 394        """Return a string that describes a migration operation for --plan."""
 395        prefix = ""
 396        is_error = False
 397        if hasattr(operation, "code"):
 398            code = operation.code
 399            action = (code.__doc__ or "") if code else None
 400        elif hasattr(operation, "sql"):
 401            action = operation.sql
 402        else:
 403            action = ""
 404        if action is not None:
 405            action = str(action).replace("\n", "")
 406        if action:
 407            action = " -> " + action
 408        truncated = Truncator(action)
 409        return prefix + operation.describe() + truncated.chars(40), is_error
 410
 411    # Work out which packages have migrations and which do not
 412    executor = MigrationExecutor(get_connection(), migration_progress_callback)
 413
 414    # Raise an error if any migrations are applied before their dependencies.
 415    executor.loader.check_consistent_history(executor.connection)
 416
 417    # Before anything else, see if there's conflicting packages and drop out
 418    # hard if there are any
 419    conflicts = executor.loader.detect_conflicts()
 420    if conflicts:
 421        name_str = "; ".join(
 422            "{} in {}".format(", ".join(names), package)
 423            for package, names in conflicts.items()
 424        )
 425        raise click.ClickException(
 426            "Conflicting migrations detected; multiple leaf nodes in the "
 427            f"migration graph: ({name_str})."
 428        )
 429
 430    # If they supplied command line arguments, work out what they mean.
 431    target_package_labels_only = True
 432    targets: list[tuple[str, str]]
 433    if package_label:
 434        try:
 435            packages_registry.get_package_config(package_label)
 436        except LookupError as err:
 437            raise click.ClickException(str(err))
 438
 439        if package_label not in executor.loader.migrated_packages:
 440            raise click.ClickException(
 441                f"Package '{package_label}' does not have migrations."
 442            )
 443
 444    if package_label and migration_name:
 445        try:
 446            migration = executor.loader.get_migration_by_prefix(
 447                package_label, migration_name
 448            )
 449        except AmbiguityError:
 450            raise click.ClickException(
 451                f"More than one migration matches '{migration_name}' in package '{package_label}'. "
 452                "Please be more specific."
 453            )
 454        except KeyError:
 455            raise click.ClickException(
 456                f"Cannot find a migration matching '{migration_name}' from package '{package_label}'."
 457            )
 458        target: tuple[str, str] = (package_label, migration.name)
 459        if (
 460            target not in executor.loader.graph.nodes
 461            and target in executor.loader.replacements
 462        ):
 463            incomplete_migration = executor.loader.replacements[target]
 464            target = incomplete_migration.replaces[-1]
 465        targets = [target]
 466        target_package_labels_only = False
 467    elif package_label:
 468        targets = [
 469            key for key in executor.loader.graph.leaf_nodes() if key[0] == package_label
 470        ]
 471    else:
 472        targets = list(executor.loader.graph.leaf_nodes())
 473
 474    migration_plan = executor.migration_plan(targets)
 475
 476    if plan:
 477        if not quiet:
 478            click.secho("Planned operations:", fg="cyan")
 479            if not migration_plan:
 480                click.echo("  No planned migration operations.")
 481            else:
 482                for migration in migration_plan:
 483                    click.secho(str(migration), fg="cyan")
 484                    for operation in migration.operations:
 485                        message, is_error = describe_operation(operation)
 486                        if is_error:
 487                            click.secho("    " + message, fg="yellow")
 488                        else:
 489                            click.echo("    " + message)
 490        if check_unapplied:
 491            sys.exit(1)
 492        return
 493
 494    if check_unapplied:
 495        if migration_plan:
 496            sys.exit(1)
 497        return
 498
 499    # Print some useful info
 500    if not quiet:
 501        if not target_package_labels_only:
 502            click.secho("Target: ", bold=True, nl=False)
 503            click.secho(f"{targets[0][1]} from {targets[0][0]}", dim=True)
 504            click.echo()  # Add newline after target
 505        elif package_label:
 506            # Only show package name when explicitly targeting a single package
 507            click.secho("Package: ", bold=True, nl=False)
 508            click.secho(package_label, dim=True)
 509            click.echo()  # Add newline after package
 510
 511    pre_migrate_state = executor._create_project_state(with_applied_migrations=True)
 512
 513    if migration_plan:
 514        # Determine whether to use atomic batch
 515        use_atomic_batch = False
 516        atomic_batch_message = None
 517        if len(migration_plan) > 1:
 518            # Check if all migrations support atomic
 519            non_atomic_migrations = [m for m in migration_plan if not m.atomic]
 520
 521            if atomic_batch is True:
 522                # User explicitly requested atomic batch
 523                if non_atomic_migrations:
 524                    names = ", ".join(
 525                        f"{m.package_label}.{m.name}" for m in non_atomic_migrations[:3]
 526                    )
 527                    if len(non_atomic_migrations) > 3:
 528                        names += f", and {len(non_atomic_migrations) - 3} more"
 529                    raise click.UsageError(
 530                        f"--atomic-batch requested but these migrations have atomic=False: {names}"
 531                    )
 532                use_atomic_batch = True
 533                atomic_batch_message = (
 534                    f"Running {len(migration_plan)} migrations in atomic batch"
 535                )
 536            elif atomic_batch is False:
 537                # User explicitly disabled atomic batch
 538                use_atomic_batch = False
 539                if len(migration_plan) > 1:
 540                    atomic_batch_message = (
 541                        f"Running {len(migration_plan)} migrations separately"
 542                    )
 543            else:
 544                # Auto-detect (atomic_batch is None)
 545                # Use atomic batch by default
 546                if not non_atomic_migrations:
 547                    use_atomic_batch = True
 548                    atomic_batch_message = (
 549                        f"Running {len(migration_plan)} migrations in atomic batch"
 550                    )
 551                else:
 552                    use_atomic_batch = False
 553                    if len(migration_plan) > 1:
 554                        atomic_batch_message = f"Running {len(migration_plan)} migrations separately (some have atomic=False)"
 555
 556        if backup or (backup is None and settings.DEBUG):
 557            backup_name = time.strftime("%Y%m%d_%H%M%S")
 558            if not quiet:
 559                click.secho("Creating backup: ", bold=True, nl=False)
 560                click.secho(f"{backup_name}", dim=True, nl=False)
 561                click.secho("... ", dim=True, nl=False)
 562
 563            backups_handler = DatabaseBackups()
 564            backups_handler.create(
 565                backup_name,
 566                source="migrate",
 567                pg_dump=os.environ.get("PG_DUMP", "pg_dump"),
 568            )
 569
 570            if not quiet:
 571                click.echo(click.style("OK", fg="green"))
 572                click.echo()  # Add blank line after backup output
 573        else:
 574            if not quiet:
 575                click.echo()  # Add blank line after packages/target info
 576
 577        if not quiet:
 578            if atomic_batch_message:
 579                click.secho(
 580                    f"Applying migrations ({atomic_batch_message.lower()}):", bold=True
 581                )
 582            else:
 583                click.secho("Applying migrations:", bold=True)
 584        post_migrate_state = executor.migrate(
 585            targets,
 586            plan=migration_plan,
 587            state=pre_migrate_state.clone(),
 588            fake=fake,
 589            atomic_batch=use_atomic_batch,
 590        )
 591        # post_migrate signals have access to all models. Ensure that all models
 592        # are reloaded in case any are delayed.
 593        post_migrate_state.clear_delayed_models_cache()
 594        post_migrate_packages = post_migrate_state.models_registry
 595
 596        # Re-render models of real packages to include relationships now that
 597        # we've got a final state. This wouldn't be necessary if real packages
 598        # models were rendered with relationships in the first place.
 599        with post_migrate_packages.bulk_update():
 600            model_keys = []
 601            for model_state in post_migrate_packages.real_models:
 602                model_key = model_state.package_label, model_state.name_lower
 603                model_keys.append(model_key)
 604                post_migrate_packages.unregister_model(*model_key)
 605        post_migrate_packages.render_multiple(
 606            [
 607                ModelState.from_model(models_registry.get_model(*model))
 608                for model in model_keys
 609            ]
 610        )
 611
 612    else:
 613        if not quiet:
 614            click.echo("No migrations to apply.")
 615            # If there's changes that aren't in migrations yet, tell them
 616            # how to fix it.
 617            autodetector = MigrationAutodetector(
 618                executor.loader.project_state(),
 619                ProjectState.from_models_registry(models_registry),
 620            )
 621            changes = autodetector.changes(graph=executor.loader.graph)
 622            if changes:
 623                packages = ", ".join(sorted(changes))
 624                click.echo(
 625                    f"Your models have changes that are not yet reflected in migrations ({packages})."
 626                )
 627                click.echo(
 628                    "Run 'plain makemigrations' to create migrations for these changes."
 629                )
 630
 631
 632@cli.command("list")
 633@click.argument("package_labels", nargs=-1)
 634@click.option(
 635    "--format",
 636    type=click.Choice(["list", "plan"]),
 637    default="list",
 638    help="Output format.",
 639)
 640@click.option(
 641    "-v",
 642    "--verbosity",
 643    type=int,
 644    default=1,
 645    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 646)
 647def list_migrations(
 648    package_labels: tuple[str, ...], format: str, verbosity: int
 649) -> None:
 650    """Show all migrations"""
 651
 652    def _validate_package_names(package_names: tuple[str, ...]) -> None:
 653        has_bad_names = False
 654        for package_name in package_names:
 655            try:
 656                packages_registry.get_package_config(package_name)
 657            except LookupError as err:
 658                click.echo(str(err), err=True)
 659                has_bad_names = True
 660        if has_bad_names:
 661            sys.exit(2)
 662
 663    def show_list(
 664        connection: DatabaseConnection, package_names: tuple[str, ...]
 665    ) -> None:
 666        """
 667        Show a list of all migrations on the system, or only those of
 668        some named packages.
 669        """
 670        # Load migrations from disk/DB
 671        loader = MigrationLoader(connection, ignore_no_migrations=True)
 672        recorder = MigrationRecorder(connection)
 673        recorded_migrations = recorder.applied_migrations()
 674
 675        graph = loader.graph
 676        # If we were passed a list of packages, validate it
 677        package_names_list: list[str]
 678        if package_names:
 679            _validate_package_names(package_names)
 680            package_names_list = list(package_names)
 681        # Otherwise, show all packages in alphabetic order
 682        else:
 683            package_names_list = sorted(loader.migrated_packages)
 684        # For each app, print its migrations in order from oldest (roots) to
 685        # newest (leaves).
 686        for package_name in package_names_list:
 687            click.secho(package_name, fg="cyan", bold=True)
 688            shown = set()
 689            for node in graph.leaf_nodes(package_name):
 690                for plan_node in graph.forwards_plan(node):
 691                    if plan_node not in shown and plan_node[0] == package_name:
 692                        # Give it a nice title if it's a squashed one
 693                        title = plan_node[1]
 694                        migration_node = graph.nodes[plan_node]
 695                        if migration_node and migration_node.replaces:
 696                            title += (
 697                                f" ({len(migration_node.replaces)} squashed migrations)"
 698                            )
 699                        applied_migration = (
 700                            loader.applied_migrations.get(plan_node)
 701                            if loader.applied_migrations
 702                            else None
 703                        )
 704                        # Mark it as applied/unapplied
 705                        if applied_migration:
 706                            if plan_node in recorded_migrations:
 707                                output = f" [X] {title}"
 708                            else:
 709                                title += " Run `plain migrate` to finish recording."
 710                                output = f" [-] {title}"
 711                            if verbosity >= 2 and hasattr(applied_migration, "applied"):
 712                                output += f" (applied at {applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')})"
 713                            click.echo(output)
 714                        else:
 715                            click.echo(f" [ ] {title}")
 716                        shown.add(plan_node)
 717            # If we didn't print anything, then a small message
 718            if not shown:
 719                click.secho(" (no migrations)", fg="red")
 720
 721    def show_plan(
 722        connection: DatabaseConnection, package_names: tuple[str, ...]
 723    ) -> None:
 724        """
 725        Show all known migrations (or only those of the specified package_names)
 726        in the order they will be applied.
 727        """
 728        # Load migrations from disk/DB
 729        loader = MigrationLoader(connection)
 730        assert loader.applied_migrations is not None
 731        graph = loader.graph
 732        if package_names:
 733            _validate_package_names(package_names)
 734            targets = [key for key in graph.leaf_nodes() if key[0] in package_names]
 735        else:
 736            targets = graph.leaf_nodes()
 737        plan = []
 738        seen = set()
 739
 740        # Generate the plan
 741        for target in targets:
 742            for migration in graph.forwards_plan(target):
 743                if migration not in seen:
 744                    node = graph.node_map[migration]
 745                    plan.append(node)
 746                    seen.add(migration)
 747
 748        # Output
 749        def print_deps(node: Any) -> str:
 750            out = []
 751            for parent in sorted(node.parents):
 752                out.append(f"{parent.key[0]}.{parent.key[1]}")
 753            if out:
 754                return f" ... ({', '.join(out)})"
 755            return ""
 756
 757        for node in plan:
 758            deps = ""
 759            if verbosity >= 2:
 760                deps = print_deps(node)
 761            if node.key in loader.applied_migrations:
 762                click.echo(f"[X]  {node.key[0]}.{node.key[1]}{deps}")
 763            else:
 764                click.echo(f"[ ]  {node.key[0]}.{node.key[1]}{deps}")
 765        if not plan:
 766            click.secho("(no migrations)", fg="red")
 767
 768    # Get the database we're operating from
 769
 770    conn = get_connection()
 771    if format == "plan":
 772        show_plan(conn, package_labels)
 773    else:
 774        show_list(conn, package_labels)
 775
 776
 777@cli.command("prune")
 778@click.option(
 779    "--yes",
 780    is_flag=True,
 781    help="Skip confirmation prompt (for non-interactive use).",
 782)
 783def prune(yes: bool) -> None:
 784    """Remove stale migration records from the database"""
 785    # Load migrations from disk and database
 786    conn = get_connection()
 787    loader = MigrationLoader(conn, ignore_no_migrations=True)
 788    assert loader.disk_migrations is not None
 789    recorder = MigrationRecorder(conn)
 790    recorded_migrations = recorder.applied_migrations()
 791
 792    # Find all prunable migrations (recorded but not on disk)
 793    all_prunable = [
 794        migration
 795        for migration in recorded_migrations
 796        if migration not in loader.disk_migrations
 797    ]
 798
 799    if not all_prunable:
 800        click.echo("No stale migration records found.")
 801        return
 802
 803    # Separate into existing packages vs orphaned packages
 804    existing_packages = set(loader.migrated_packages)
 805    prunable_existing: dict[str, list[str]] = {}
 806    prunable_orphaned: dict[str, list[str]] = {}
 807
 808    for migration in all_prunable:
 809        package, name = migration
 810        if package in existing_packages:
 811            if package not in prunable_existing:
 812                prunable_existing[package] = []
 813            prunable_existing[package].append(name)
 814        else:
 815            if package not in prunable_orphaned:
 816                prunable_orphaned[package] = []
 817            prunable_orphaned[package].append(name)
 818
 819    # Display what was found
 820    if prunable_existing:
 821        click.secho(
 822            "Stale migration records (from existing packages):",
 823            fg="yellow",
 824            bold=True,
 825        )
 826        for package in sorted(prunable_existing.keys()):
 827            click.secho(f"  {package}:", fg="yellow")
 828            for name in sorted(prunable_existing[package]):
 829                click.echo(f"    - {name}")
 830        click.echo()
 831
 832    if prunable_orphaned:
 833        click.secho(
 834            "Orphaned migration records (from removed packages):",
 835            fg="red",
 836            bold=True,
 837        )
 838        for package in sorted(prunable_orphaned.keys()):
 839            click.secho(f"  {package}:", fg="red")
 840            for name in sorted(prunable_orphaned[package]):
 841                click.echo(f"    - {name}")
 842        click.echo()
 843
 844    total_count = sum(len(migs) for migs in prunable_existing.values()) + sum(
 845        len(migs) for migs in prunable_orphaned.values()
 846    )
 847
 848    if not yes:
 849        click.echo(
 850            f"Found {total_count} stale migration record{'s' if total_count != 1 else ''}."
 851        )
 852        click.echo()
 853
 854        # Prompt for confirmation if interactive
 855        if not click.confirm(
 856            "Do you want to remove these migrations from the database?"
 857        ):
 858            return
 859
 860    # Actually prune the migrations
 861    click.secho("Pruning migrations...", bold=True)
 862
 863    for package, migration_names in prunable_existing.items():
 864        for name in sorted(migration_names):
 865            click.echo(f"  Pruning {package}.{name}...", nl=False)
 866            recorder.record_unapplied(package, name)
 867            click.echo(" OK")
 868
 869    for package, migration_names in prunable_orphaned.items():
 870        for name in sorted(migration_names):
 871            click.echo(f"  Pruning {package}.{name} (orphaned)...", nl=False)
 872            recorder.record_unapplied(package, name)
 873            click.echo(" OK")
 874
 875    click.secho(
 876        f"✓ Removed {total_count} stale migration record{'s' if total_count != 1 else ''}.",
 877        fg="green",
 878    )
 879
 880
 881@cli.command("squash")
 882@click.argument("package_label")
 883@click.argument("start_migration_name", required=False)
 884@click.argument("migration_name")
 885@click.option(
 886    "--no-optimize",
 887    is_flag=True,
 888    help="Do not try to optimize the squashed operations.",
 889)
 890@click.option(
 891    "--noinput",
 892    "--no-input",
 893    "no_input",
 894    is_flag=True,
 895    help="Tells Plain to NOT prompt the user for input of any kind.",
 896)
 897@click.option("--squashed-name", help="Sets the name of the new squashed migration.")
 898@click.option(
 899    "-v",
 900    "--verbosity",
 901    type=int,
 902    default=1,
 903    help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
 904)
 905def squash(
 906    package_label: str,
 907    start_migration_name: str | None,
 908    migration_name: str,
 909    no_optimize: bool,
 910    no_input: bool,
 911    squashed_name: str | None,
 912    verbosity: int,
 913) -> None:
 914    """Squash multiple migrations into one"""
 915    interactive = not no_input
 916
 917    def find_migration(
 918        loader: MigrationLoader, package_label: str, name: str
 919    ) -> Migration:
 920        try:
 921            return loader.get_migration_by_prefix(package_label, name)
 922        except AmbiguityError:
 923            raise click.ClickException(
 924                f"More than one migration matches '{name}' in package '{package_label}'. Please be more specific."
 925            )
 926        except KeyError:
 927            raise click.ClickException(
 928                f"Cannot find a migration matching '{name}' from package '{package_label}'."
 929            )
 930
 931    # Validate package_label
 932    try:
 933        packages_registry.get_package_config(package_label)
 934    except LookupError as err:
 935        raise click.ClickException(str(err))
 936
 937    # Load the current graph state, check the app and migration they asked for exists
 938    loader = MigrationLoader(get_connection())
 939    if package_label not in loader.migrated_packages:
 940        raise click.ClickException(
 941            f"Package '{package_label}' does not have migrations (so squashmigrations on it makes no sense)"
 942        )
 943
 944    migration = find_migration(loader, package_label, migration_name)
 945
 946    # Work out the list of predecessor migrations
 947    migrations_to_squash: list[Migration] = []
 948    for al, mn in loader.graph.forwards_plan((migration.package_label, migration.name)):
 949        if al != migration.package_label:
 950            continue
 951        candidate = loader.get_migration(al, mn)
 952        if candidate is None:
 953            raise click.ClickException(f"Migration {mn} in package {al} is missing")
 954        migrations_to_squash.append(candidate)
 955
 956    if start_migration_name:
 957        start_migration = find_migration(loader, package_label, start_migration_name)
 958        start = loader.get_migration(
 959            start_migration.package_label, start_migration.name
 960        )
 961        if start is None:
 962            raise click.ClickException(
 963                f"Cannot find migration '{start_migration.name}' in package '{package_label}'."
 964            )
 965        try:
 966            start_index = migrations_to_squash.index(start)
 967            migrations_to_squash = migrations_to_squash[start_index:]
 968        except ValueError:
 969            raise click.ClickException(
 970                f"The migration '{start_migration}' cannot be found. Maybe it comes after "
 971                f"the migration '{migration}'?\n"
 972                f"Have a look at:\n"
 973                f"  plain migrations list {package_label}\n"
 974                f"to debug this issue."
 975            )
 976
 977    # Tell them what we're doing and optionally ask if we should proceed
 978    if verbosity > 0 or interactive:
 979        click.secho("Will squash the following migrations:", fg="cyan", bold=True)
 980        for migration in migrations_to_squash:
 981            click.echo(f" - {migration.name}")
 982
 983        if interactive:
 984            if not click.confirm("Do you wish to proceed?"):
 985                return
 986
 987    # Load the operations from all those migrations and concat together,
 988    # along with collecting external dependencies and detecting double-squashing
 989    operations = []
 990    dependencies = set()
 991    # We need to take all dependencies from the first migration in the list
 992    # as it may be 0002 depending on 0001
 993    first_migration = True
 994    for smigration in migrations_to_squash:
 995        if smigration.replaces:
 996            raise click.ClickException(
 997                "You cannot squash squashed migrations! Please transition it to a "
 998                "normal migration first"
 999            )
1000        operations.extend(smigration.operations)
1001        for dependency in smigration.dependencies:
1002            if isinstance(dependency, SettingsTuple):
1003                dependencies.add(dependency)
1004            elif dependency[0] != smigration.package_label or first_migration:
1005                dependencies.add(dependency)
1006        first_migration = False
1007
1008    if no_optimize:
1009        if verbosity > 0:
1010            click.secho("(Skipping optimization.)", fg="yellow")
1011        new_operations = operations
1012    else:
1013        if verbosity > 0:
1014            click.secho("Optimizing...", fg="cyan")
1015
1016        optimizer = MigrationOptimizer()
1017        new_operations = optimizer.optimize(operations, migration.package_label)
1018
1019        if verbosity > 0:
1020            if len(new_operations) == len(operations):
1021                click.echo("  No optimizations possible.")
1022            else:
1023                click.echo(
1024                    f"  Optimized from {len(operations)} operations to {len(new_operations)} operations."
1025                )
1026
1027    # Work out the value of replaces (any squashed ones we're re-squashing)
1028    # need to feed their replaces into ours
1029    replaces: list[tuple[str, str]] = []
1030    for migration in migrations_to_squash:
1031        if migration.replaces:
1032            replaces.extend(migration.replaces)
1033        else:
1034            replaces.append((migration.package_label, migration.name))
1035
1036    # Make a new migration with those operations
1037    subclass = type(
1038        "Migration",
1039        (migrations.Migration,),
1040        {
1041            "dependencies": dependencies,
1042            "operations": new_operations,
1043            "replaces": replaces,
1044        },
1045    )
1046    if start_migration_name:
1047        if squashed_name:
1048            # Use the name from --squashed-name
1049            prefix, _ = start_migration.name.split("_", 1)
1050            name = f"{prefix}_{squashed_name}"
1051        else:
1052            # Generate a name
1053            name = f"{start_migration.name}_squashed_{migration.name}"
1054        new_migration = subclass(name, package_label)
1055    else:
1056        name = f"0001_{'squashed_' + migration.name if not squashed_name else squashed_name}"
1057        new_migration = subclass(name, package_label)
1058        new_migration.initial = True
1059
1060    # Write out the new migration file
1061    writer = MigrationWriter(new_migration)
1062    if os.path.exists(writer.path):
1063        raise click.ClickException(
1064            f"Migration {new_migration.name} already exists. Use a different name."
1065        )
1066    with open(writer.path, "w", encoding="utf-8") as fh:
1067        fh.write(writer.as_string())
1068
1069    if verbosity > 0:
1070        click.secho(
1071            f"Created new squashed migration {writer.path}", fg="green", bold=True
1072        )
1073        click.echo(
1074            "  You should commit this migration but leave the old ones in place;\n"
1075            "  the new migration will be used for new installs. Once you are sure\n"
1076            "  all instances of the codebase have applied the migrations you squashed,\n"
1077            "  you can delete them."
1078        )
1079        if writer.needs_manual_porting:
1080            click.secho("Manual porting required", fg="yellow", bold=True)
1081            click.echo(
1082                "  Your migrations contained functions that must be manually copied over,\n"
1083                "  as we could not safely copy their implementation.\n"
1084                "  See the comment at the top of the squashed migration for details."
1085            )