1from __future__ import annotations
2
3import os
4import subprocess
5import sys
6import time
7from typing import TYPE_CHECKING, Any, cast
8
9import click
10
11from plain.cli import register_cli
12from plain.packages import packages_registry
13from plain.runtime import settings
14from plain.utils.text import Truncator
15
16from . import migrations
17from .backups.cli import cli as backups_cli
18from .backups.core import DatabaseBackups
19from .db import OperationalError
20from .db import db_connection as _db_connection
21from .migrations.autodetector import MigrationAutodetector
22from .migrations.executor import MigrationExecutor
23from .migrations.loader import AmbiguityError, MigrationLoader
24from .migrations.migration import Migration, SettingsTuple
25from .migrations.optimizer import MigrationOptimizer
26from .migrations.questioner import (
27 InteractiveMigrationQuestioner,
28 NonInteractiveMigrationQuestioner,
29)
30from .migrations.recorder import MigrationRecorder
31from .migrations.state import ModelState, ProjectState
32from .migrations.writer import MigrationWriter
33from .registry import models_registry
34
35if TYPE_CHECKING:
36 from .backends.base.base import BaseDatabaseWrapper
37 from .migrations.operations.base import Operation
38
39 db_connection = cast("BaseDatabaseWrapper", _db_connection)
40else:
41 db_connection = _db_connection
42
43
44@register_cli("models")
45@click.group()
46def cli() -> None:
47 pass
48
49
50cli.add_command(backups_cli)
51
52
53@cli.command()
54@click.argument("parameters", nargs=-1)
55def db_shell(parameters: tuple[str, ...]) -> None:
56 """Runs the command-line client for specified database, or the default database if none is provided."""
57 try:
58 db_connection.client.runshell(list(parameters))
59 except FileNotFoundError:
60 # Note that we're assuming the FileNotFoundError relates to the
61 # command missing. It could be raised for some other reason, in
62 # which case this error message would be inaccurate. Still, this
63 # message catches the common case.
64 click.secho(
65 f"You appear not to have the {db_connection.client.executable_name!r} program installed or on your path.",
66 fg="red",
67 err=True,
68 )
69 sys.exit(1)
70 except subprocess.CalledProcessError as e:
71 click.secho(
72 '"{}" returned non-zero exit status {}.'.format(
73 " ".join(e.cmd),
74 e.returncode,
75 ),
76 fg="red",
77 err=True,
78 )
79 sys.exit(e.returncode)
80
81
82@cli.command()
83def db_wait() -> None:
84 """Wait for the database to be ready"""
85 attempts = 0
86 while True:
87 attempts += 1
88 waiting_for = False
89
90 try:
91 db_connection.ensure_connection()
92 except OperationalError:
93 waiting_for = True
94
95 if waiting_for:
96 if attempts > 1:
97 # After the first attempt, start printing them
98 click.secho(
99 f"Waiting for database (attempt {attempts})",
100 fg="yellow",
101 )
102 time.sleep(1.5)
103 else:
104 click.secho("✔ Database ready", fg="green")
105 break
106
107
108@cli.command(name="list")
109@click.argument("package_labels", nargs=-1)
110@click.option(
111 "--app-only",
112 is_flag=True,
113 help="Only show models from packages that start with 'app'.",
114)
115def list_models(package_labels: tuple[str, ...], app_only: bool) -> None:
116 """List installed models."""
117
118 packages = set(package_labels)
119
120 for model in sorted(
121 models_registry.get_models(),
122 key=lambda m: (m.model_options.package_label, m.model_options.model_name),
123 ):
124 pkg = model.model_options.package_label
125 pkg_name = packages_registry.get_package_config(pkg).name
126 if app_only and not pkg_name.startswith("app"):
127 continue
128 if packages and pkg not in packages:
129 continue
130 fields = ", ".join(f.name for f in model._model_meta.get_fields())
131 click.echo(
132 f"{click.style(pkg, fg='cyan')}.{click.style(model.__name__, fg='blue')}"
133 )
134 click.echo(f" table: {model.model_options.db_table}")
135 click.echo(f" fields: {fields}")
136 click.echo(f" package: {pkg_name}\n")
137
138
139@register_cli("makemigrations")
140@cli.command()
141@click.argument("package_labels", nargs=-1)
142@click.option(
143 "--dry-run",
144 is_flag=True,
145 help="Just show what migrations would be made; don't actually write them.",
146)
147@click.option("--empty", is_flag=True, help="Create an empty migration.")
148@click.option(
149 "--noinput",
150 "--no-input",
151 "no_input",
152 is_flag=True,
153 help="Tells Plain to NOT prompt the user for input of any kind.",
154)
155@click.option("-n", "--name", help="Use this name for migration file(s).")
156@click.option(
157 "--check",
158 is_flag=True,
159 help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.",
160)
161@click.option(
162 "-v",
163 "--verbosity",
164 type=int,
165 default=1,
166 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
167)
168def makemigrations(
169 package_labels: tuple[str, ...],
170 dry_run: bool,
171 empty: bool,
172 no_input: bool,
173 name: str | None,
174 check: bool,
175 verbosity: int,
176) -> None:
177 """Creates new migration(s) for packages."""
178
179 written_files: list[str] = []
180 interactive = not no_input
181 migration_name = name
182 check_changes = check
183
184 def log(msg: str, level: int = 1) -> None:
185 if verbosity >= level:
186 click.echo(msg)
187
188 def write_migration_files(
189 changes: dict[str, list[Migration]],
190 update_previous_migration_paths: dict[str, str] | None = None,
191 ) -> None:
192 """Take a changes dict and write them out as migration files."""
193 directory_created = {}
194 for package_label, package_migrations in changes.items():
195 log(
196 click.style(f"Migrations for '{package_label}':", fg="cyan", bold=True),
197 level=1,
198 )
199 for migration in package_migrations:
200 writer = MigrationWriter(migration)
201 migration_string = os.path.relpath(writer.path)
202 log(f" {click.style(migration_string, fg='yellow')}\n", level=1)
203 for operation in migration.operations:
204 log(f" - {operation.describe()}", level=1)
205
206 if not dry_run:
207 migrations_directory = os.path.dirname(writer.path)
208 if not directory_created.get(package_label):
209 os.makedirs(migrations_directory, exist_ok=True)
210 init_path = os.path.join(migrations_directory, "__init__.py")
211 if not os.path.isfile(init_path):
212 open(init_path, "w").close()
213 directory_created[package_label] = True
214
215 migration_string = writer.as_string()
216 with open(writer.path, "w", encoding="utf-8") as fh:
217 fh.write(migration_string)
218 written_files.append(writer.path)
219
220 if update_previous_migration_paths:
221 prev_path = update_previous_migration_paths[package_label]
222 if writer.needs_manual_porting:
223 log(
224 click.style(
225 f"Updated migration {migration_string} requires manual porting.\n"
226 f"Previous migration {os.path.relpath(prev_path)} was kept and "
227 f"must be deleted after porting functions manually.",
228 fg="yellow",
229 ),
230 level=1,
231 )
232 else:
233 os.remove(prev_path)
234 log(f"Deleted {os.path.relpath(prev_path)}", level=1)
235 elif verbosity >= 3:
236 log(
237 click.style(
238 f"Full migrations file '{writer.filename}':",
239 fg="cyan",
240 bold=True,
241 ),
242 level=3,
243 )
244 log(writer.as_string(), level=3)
245
246 # Validate package labels
247 package_labels_set = set(package_labels)
248 has_bad_labels = False
249 for package_label in package_labels_set:
250 try:
251 packages_registry.get_package_config(package_label)
252 except LookupError as err:
253 click.echo(str(err), err=True)
254 has_bad_labels = True
255 if has_bad_labels:
256 sys.exit(2)
257
258 # Load the current graph state
259 loader = MigrationLoader(None, ignore_no_migrations=True)
260
261 # Raise an error if any migrations are applied before their dependencies.
262 # Only the default db_connection is supported.
263 loader.check_consistent_history(db_connection)
264
265 # Check for conflicts
266 conflicts = loader.detect_conflicts()
267 if package_labels_set:
268 conflicts = {
269 package_label: conflict
270 for package_label, conflict in conflicts.items()
271 if package_label in package_labels_set
272 }
273
274 if conflicts:
275 name_str = "; ".join(
276 "{} in {}".format(", ".join(names), package)
277 for package, names in conflicts.items()
278 )
279 raise click.ClickException(
280 f"Conflicting migrations detected; multiple leaf nodes in the "
281 f"migration graph: ({name_str})."
282 )
283
284 # Set up questioner
285 if interactive:
286 questioner = InteractiveMigrationQuestioner(
287 specified_packages=package_labels_set,
288 dry_run=dry_run,
289 )
290 else:
291 questioner = NonInteractiveMigrationQuestioner(
292 specified_packages=package_labels_set,
293 dry_run=dry_run,
294 verbosity=verbosity,
295 )
296
297 # Set up autodetector
298 autodetector = MigrationAutodetector(
299 loader.project_state(),
300 ProjectState.from_models_registry(models_registry),
301 questioner,
302 )
303
304 # Handle empty migrations if requested
305 if empty:
306 if not package_labels_set:
307 raise click.ClickException(
308 "You must supply at least one package label when using --empty."
309 )
310 changes = {
311 package: [Migration("custom", package)] for package in package_labels_set
312 }
313 changes = autodetector.arrange_for_graph(
314 changes=changes,
315 graph=loader.graph,
316 migration_name=migration_name,
317 )
318 write_migration_files(changes)
319 return
320
321 # Detect changes
322 changes = autodetector.changes(
323 graph=loader.graph,
324 trim_to_packages=package_labels_set or None,
325 convert_packages=package_labels_set or None,
326 migration_name=migration_name,
327 )
328
329 if not changes:
330 log(
331 "No changes detected"
332 if not package_labels_set
333 else f"No changes detected in {'package' if len(package_labels_set) == 1 else 'packages'} "
334 f"'{', '.join(package_labels_set)}'",
335 level=1,
336 )
337 else:
338 if check_changes:
339 sys.exit(1)
340
341 write_migration_files(changes)
342
343
344@register_cli("migrate")
345@cli.command()
346@click.argument("package_label", required=False)
347@click.argument("migration_name", required=False)
348@click.option(
349 "--fake", is_flag=True, help="Mark migrations as run without actually running them."
350)
351@click.option(
352 "--plan",
353 is_flag=True,
354 help="Shows a list of the migration actions that will be performed.",
355)
356@click.option(
357 "--check",
358 "check_unapplied",
359 is_flag=True,
360 help="Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.",
361)
362@click.option(
363 "--backup/--no-backup",
364 "backup",
365 is_flag=True,
366 default=None,
367 help="Explicitly enable/disable pre-migration backups.",
368)
369@click.option(
370 "--no-input",
371 "--noinput",
372 "no_input",
373 is_flag=True,
374 help="Tells Plain to NOT prompt the user for input of any kind.",
375)
376@click.option(
377 "--atomic-batch/--no-atomic-batch",
378 default=None,
379 help="Run migrations in a single transaction (auto-detected by default)",
380)
381@click.option(
382 "--quiet",
383 is_flag=True,
384 help="Suppress migration output (used for test database creation).",
385)
386def migrate(
387 package_label: str | None,
388 migration_name: str | None,
389 fake: bool,
390 plan: bool,
391 check_unapplied: bool,
392 backup: bool | None,
393 no_input: bool,
394 atomic_batch: bool | None,
395 quiet: bool,
396) -> None:
397 """Updates database schema. Manages both packages with migrations and those without."""
398
399 def migration_progress_callback(
400 action: str,
401 *,
402 migration: Migration | None = None,
403 fake: bool = False,
404 operation: Operation | None = None,
405 sql_statements: list[str] | None = None,
406 ) -> None:
407 if quiet:
408 return
409
410 if action == "apply_start":
411 click.echo() # Always add newline between migrations
412 if fake:
413 click.secho(f"{migration} (faked)", fg="cyan")
414 else:
415 click.secho(f"{migration}", fg="cyan")
416 elif action == "apply_success":
417 pass # Already shown via operations
418 elif action == "operation_start":
419 click.echo(f" {operation.describe()}", nl=False)
420 click.secho("... ", dim=True, nl=False)
421 elif action == "operation_success":
422 # Show SQL statements (no OK needed, SQL implies success)
423 if sql_statements:
424 click.echo() # newline after "..."
425 for sql in sql_statements:
426 click.secho(f" {sql}", dim=True)
427 else:
428 # No SQL: just add a newline
429 click.echo()
430
431 def describe_operation(operation: Any) -> tuple[str, bool]:
432 """Return a string that describes a migration operation for --plan."""
433 prefix = ""
434 is_error = False
435 if hasattr(operation, "code"):
436 code = operation.code
437 action = (code.__doc__ or "") if code else None
438 elif hasattr(operation, "sql"):
439 action = operation.sql
440 else:
441 action = ""
442 if action is not None:
443 action = str(action).replace("\n", "")
444 if action:
445 action = " -> " + action
446 truncated = Truncator(action)
447 return prefix + operation.describe() + truncated.chars(40), is_error
448
449 # Get the database we're operating from
450 # Hook for backends needing any database preparation
451 db_connection.prepare_database()
452
453 # Work out which packages have migrations and which do not
454 executor = MigrationExecutor(db_connection, migration_progress_callback)
455
456 # Raise an error if any migrations are applied before their dependencies.
457 executor.loader.check_consistent_history(db_connection)
458
459 # Before anything else, see if there's conflicting packages and drop out
460 # hard if there are any
461 conflicts = executor.loader.detect_conflicts()
462 if conflicts:
463 name_str = "; ".join(
464 "{} in {}".format(", ".join(names), package)
465 for package, names in conflicts.items()
466 )
467 raise click.ClickException(
468 "Conflicting migrations detected; multiple leaf nodes in the "
469 f"migration graph: ({name_str})."
470 )
471
472 # If they supplied command line arguments, work out what they mean.
473 target_package_labels_only = True
474 targets: list[tuple[str, str]]
475 if package_label:
476 try:
477 packages_registry.get_package_config(package_label)
478 except LookupError as err:
479 raise click.ClickException(str(err))
480
481 if package_label not in executor.loader.migrated_packages:
482 raise click.ClickException(
483 f"Package '{package_label}' does not have migrations."
484 )
485
486 if package_label and migration_name:
487 try:
488 migration = executor.loader.get_migration_by_prefix(
489 package_label, migration_name
490 )
491 except AmbiguityError:
492 raise click.ClickException(
493 f"More than one migration matches '{migration_name}' in package '{package_label}'. "
494 "Please be more specific."
495 )
496 except KeyError:
497 raise click.ClickException(
498 f"Cannot find a migration matching '{migration_name}' from package '{package_label}'."
499 )
500 target: tuple[str, str] = (package_label, migration.name)
501 if (
502 target not in executor.loader.graph.nodes
503 and target in executor.loader.replacements
504 ):
505 incomplete_migration = executor.loader.replacements[target]
506 target = incomplete_migration.replaces[-1] # type: ignore[assignment]
507 targets = [target]
508 target_package_labels_only = False
509 elif package_label:
510 targets = [
511 key for key in executor.loader.graph.leaf_nodes() if key[0] == package_label
512 ]
513 else:
514 targets = list(executor.loader.graph.leaf_nodes())
515
516 migration_plan = executor.migration_plan(targets)
517
518 if plan:
519 if not quiet:
520 click.secho("Planned operations:", fg="cyan")
521 if not migration_plan:
522 click.echo(" No planned migration operations.")
523 else:
524 for migration in migration_plan:
525 click.secho(str(migration), fg="cyan")
526 for operation in migration.operations:
527 message, is_error = describe_operation(operation)
528 if is_error:
529 click.secho(" " + message, fg="yellow")
530 else:
531 click.echo(" " + message)
532 if check_unapplied:
533 sys.exit(1)
534 return
535
536 if check_unapplied:
537 if migration_plan:
538 sys.exit(1)
539 return
540
541 # Print some useful info
542 if not quiet:
543 if target_package_labels_only:
544 packages = ", ".join(sorted({a for a, n in targets})) or "(none)"
545 click.secho("Packages: ", bold=True, nl=False)
546 click.secho(packages, dim=True)
547 click.echo() # Add newline after packages
548 else:
549 click.secho("Target: ", bold=True, nl=False)
550 click.secho(f"{targets[0][1]} from {targets[0][0]}", dim=True)
551 click.echo() # Add newline after target
552
553 pre_migrate_state = executor._create_project_state(with_applied_migrations=True)
554
555 if migration_plan:
556 # Determine whether to use atomic batch
557 use_atomic_batch = False
558 atomic_batch_message = None
559 if len(migration_plan) > 1:
560 # Check database capabilities
561 can_rollback_ddl = db_connection.features.can_rollback_ddl
562
563 # Check if all migrations support atomic
564 non_atomic_migrations = [m for m in migration_plan if not m.atomic]
565
566 if atomic_batch is True:
567 # User explicitly requested atomic batch
568 if not can_rollback_ddl:
569 raise click.UsageError(
570 f"--atomic-batch not supported on {db_connection.vendor}. "
571 "Remove the flag or use a database that supports transactional DDL."
572 )
573 if non_atomic_migrations:
574 names = ", ".join(
575 f"{m.package_label}.{m.name}" for m in non_atomic_migrations[:3]
576 )
577 if len(non_atomic_migrations) > 3:
578 names += f", and {len(non_atomic_migrations) - 3} more"
579 raise click.UsageError(
580 f"--atomic-batch requested but these migrations have atomic=False: {names}"
581 )
582 use_atomic_batch = True
583 atomic_batch_message = (
584 f"Running {len(migration_plan)} migrations in atomic batch"
585 )
586 elif atomic_batch is False:
587 # User explicitly disabled atomic batch
588 use_atomic_batch = False
589 if len(migration_plan) > 1:
590 atomic_batch_message = (
591 f"Running {len(migration_plan)} migrations separately"
592 )
593 else:
594 # Auto-detect (atomic_batch is None)
595 if can_rollback_ddl and not non_atomic_migrations:
596 use_atomic_batch = True
597 atomic_batch_message = (
598 f"Running {len(migration_plan)} migrations in atomic batch"
599 )
600 else:
601 use_atomic_batch = False
602 if len(migration_plan) > 1:
603 if not can_rollback_ddl:
604 atomic_batch_message = f"Running {len(migration_plan)} migrations separately ({db_connection.vendor} doesn't support batch)"
605 elif non_atomic_migrations:
606 atomic_batch_message = f"Running {len(migration_plan)} migrations separately (some have atomic=False)"
607 else:
608 atomic_batch_message = (
609 f"Running {len(migration_plan)} migrations separately"
610 )
611
612 if backup or (backup is None and settings.DEBUG):
613 backup_name = f"migrate_{time.strftime('%Y%m%d_%H%M%S')}"
614 if not quiet:
615 click.secho("Creating backup: ", bold=True, nl=False)
616 click.secho(f"{backup_name}", dim=True, nl=False)
617 click.secho("... ", dim=True, nl=False)
618
619 backups_handler = DatabaseBackups()
620 backups_handler.create(
621 backup_name,
622 pg_dump=os.environ.get("PG_DUMP", "pg_dump"),
623 )
624
625 if not quiet:
626 click.echo(click.style("OK", fg="green"))
627 click.echo() # Add blank line after backup output
628 else:
629 if not quiet:
630 click.echo() # Add blank line after packages/target info
631
632 if not quiet:
633 if atomic_batch_message:
634 click.secho(
635 f"Applying migrations ({atomic_batch_message.lower()}):", bold=True
636 )
637 else:
638 click.secho("Applying migrations:", bold=True)
639 post_migrate_state = executor.migrate(
640 targets,
641 plan=migration_plan,
642 state=pre_migrate_state.clone(),
643 fake=fake,
644 atomic_batch=use_atomic_batch,
645 )
646 # post_migrate signals have access to all models. Ensure that all models
647 # are reloaded in case any are delayed.
648 post_migrate_state.clear_delayed_models_cache()
649 post_migrate_packages = post_migrate_state.models_registry
650
651 # Re-render models of real packages to include relationships now that
652 # we've got a final state. This wouldn't be necessary if real packages
653 # models were rendered with relationships in the first place.
654 with post_migrate_packages.bulk_update():
655 model_keys = []
656 for model_state in post_migrate_packages.real_models:
657 model_key = model_state.package_label, model_state.name_lower
658 model_keys.append(model_key)
659 post_migrate_packages.unregister_model(*model_key)
660 post_migrate_packages.render_multiple(
661 [
662 ModelState.from_model(models_registry.get_model(*model))
663 for model in model_keys
664 ]
665 )
666
667 else:
668 if not quiet:
669 click.echo("No migrations to apply.")
670 # If there's changes that aren't in migrations yet, tell them
671 # how to fix it.
672 autodetector = MigrationAutodetector(
673 executor.loader.project_state(),
674 ProjectState.from_models_registry(models_registry),
675 )
676 changes = autodetector.changes(graph=executor.loader.graph)
677 if changes:
678 packages = ", ".join(sorted(changes))
679 click.echo(
680 f"Your models have changes that are not yet reflected in migrations ({packages})."
681 )
682 click.echo(
683 "Run 'plain makemigrations' to create migrations for these changes."
684 )
685
686
687@cli.command()
688@click.argument("package_labels", nargs=-1)
689@click.option(
690 "--format",
691 type=click.Choice(["list", "plan"]),
692 default="list",
693 help="Output format.",
694)
695@click.option(
696 "-v",
697 "--verbosity",
698 type=int,
699 default=1,
700 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
701)
702def show_migrations(
703 package_labels: tuple[str, ...], format: str, verbosity: int
704) -> None:
705 """Shows all available migrations for the current project"""
706
707 def _validate_package_names(package_names: tuple[str, ...]) -> None:
708 has_bad_names = False
709 for package_name in package_names:
710 try:
711 packages_registry.get_package_config(package_name)
712 except LookupError as err:
713 click.echo(str(err), err=True)
714 has_bad_names = True
715 if has_bad_names:
716 sys.exit(2)
717
718 def show_list(db_connection: Any, package_names: tuple[str, ...]) -> None:
719 """
720 Show a list of all migrations on the system, or only those of
721 some named packages.
722 """
723 # Load migrations from disk/DB
724 loader = MigrationLoader(db_connection, ignore_no_migrations=True)
725 recorder = MigrationRecorder(db_connection)
726 recorded_migrations = recorder.applied_migrations()
727
728 graph = loader.graph
729 # If we were passed a list of packages, validate it
730 package_names_list: list[str]
731 if package_names:
732 _validate_package_names(package_names)
733 package_names_list = list(package_names)
734 # Otherwise, show all packages in alphabetic order
735 else:
736 package_names_list = sorted(loader.migrated_packages)
737 # For each app, print its migrations in order from oldest (roots) to
738 # newest (leaves).
739 for package_name in package_names_list:
740 click.secho(package_name, fg="cyan", bold=True)
741 shown = set()
742 for node in graph.leaf_nodes(package_name):
743 for plan_node in graph.forwards_plan(node):
744 if plan_node not in shown and plan_node[0] == package_name:
745 # Give it a nice title if it's a squashed one
746 title = plan_node[1]
747 if graph.nodes[plan_node].replaces: # type: ignore[union-attr]
748 title += f" ({len(graph.nodes[plan_node].replaces)} squashed migrations)" # type: ignore[union-attr]
749 applied_migration = loader.applied_migrations.get(plan_node) # type: ignore[union-attr]
750 # Mark it as applied/unapplied
751 if applied_migration:
752 if plan_node in recorded_migrations:
753 output = f" [X] {title}"
754 else:
755 title += " Run `plain migrate` to finish recording."
756 output = f" [-] {title}"
757 if verbosity >= 2 and hasattr(applied_migration, "applied"):
758 output += f" (applied at {applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')})"
759 click.echo(output)
760 else:
761 click.echo(f" [ ] {title}")
762 shown.add(plan_node)
763 # If we didn't print anything, then a small message
764 if not shown:
765 click.secho(" (no migrations)", fg="red")
766
767 def show_plan(db_connection: Any, package_names: tuple[str, ...]) -> None:
768 """
769 Show all known migrations (or only those of the specified package_names)
770 in the order they will be applied.
771 """
772 # Load migrations from disk/DB
773 loader = MigrationLoader(db_connection)
774 graph = loader.graph
775 if package_names:
776 _validate_package_names(package_names)
777 targets = [key for key in graph.leaf_nodes() if key[0] in package_names]
778 else:
779 targets = graph.leaf_nodes()
780 plan = []
781 seen = set()
782
783 # Generate the plan
784 for target in targets:
785 for migration in graph.forwards_plan(target):
786 if migration not in seen:
787 node = graph.node_map[migration]
788 plan.append(node)
789 seen.add(migration)
790
791 # Output
792 def print_deps(node: Any) -> str:
793 out = []
794 for parent in sorted(node.parents):
795 out.append(f"{parent.key[0]}.{parent.key[1]}")
796 if out:
797 return f" ... ({', '.join(out)})"
798 return ""
799
800 for node in plan:
801 deps = ""
802 if verbosity >= 2:
803 deps = print_deps(node)
804 if node.key in loader.applied_migrations: # type: ignore[operator]
805 click.echo(f"[X] {node.key[0]}.{node.key[1]}{deps}")
806 else:
807 click.echo(f"[ ] {node.key[0]}.{node.key[1]}{deps}")
808 if not plan:
809 click.secho("(no migrations)", fg="red")
810
811 # Get the database we're operating from
812
813 if format == "plan":
814 show_plan(db_connection, package_labels)
815 else:
816 show_list(db_connection, package_labels)
817
818
819@cli.command()
820@click.option(
821 "--yes",
822 is_flag=True,
823 help="Skip confirmation prompt (for non-interactive use).",
824)
825def prune_migrations(yes: bool) -> None:
826 """Show and optionally remove stale migration records from the database."""
827 # Load migrations from disk and database
828 loader = MigrationLoader(db_connection, ignore_no_migrations=True)
829 recorder = MigrationRecorder(db_connection)
830 recorded_migrations = recorder.applied_migrations()
831
832 # Find all prunable migrations (recorded but not on disk)
833 all_prunable = [
834 migration
835 for migration in recorded_migrations
836 if migration not in loader.disk_migrations # type: ignore[operator]
837 ]
838
839 if not all_prunable:
840 click.echo("No stale migration records found.")
841 return
842
843 # Separate into existing packages vs orphaned packages
844 existing_packages = set(loader.migrated_packages)
845 prunable_existing: dict[str, list[str]] = {}
846 prunable_orphaned: dict[str, list[str]] = {}
847
848 for migration in all_prunable:
849 package, name = migration
850 if package in existing_packages:
851 if package not in prunable_existing:
852 prunable_existing[package] = []
853 prunable_existing[package].append(name)
854 else:
855 if package not in prunable_orphaned:
856 prunable_orphaned[package] = []
857 prunable_orphaned[package].append(name)
858
859 # Display what was found
860 if prunable_existing:
861 click.secho(
862 "Stale migration records (from existing packages):",
863 fg="yellow",
864 bold=True,
865 )
866 for package in sorted(prunable_existing.keys()):
867 click.secho(f" {package}:", fg="yellow")
868 for name in sorted(prunable_existing[package]):
869 click.echo(f" - {name}")
870 click.echo()
871
872 if prunable_orphaned:
873 click.secho(
874 "Orphaned migration records (from removed packages):",
875 fg="red",
876 bold=True,
877 )
878 for package in sorted(prunable_orphaned.keys()):
879 click.secho(f" {package}:", fg="red")
880 for name in sorted(prunable_orphaned[package]):
881 click.echo(f" - {name}")
882 click.echo()
883
884 total_count = sum(len(migs) for migs in prunable_existing.values()) + sum(
885 len(migs) for migs in prunable_orphaned.values()
886 )
887
888 if not yes:
889 click.echo(
890 f"Found {total_count} stale migration record{'s' if total_count != 1 else ''}."
891 )
892 click.echo()
893
894 # Prompt for confirmation if interactive
895 if not click.confirm(
896 "Do you want to remove these migrations from the database?"
897 ):
898 return
899
900 # Actually prune the migrations
901 click.secho("Pruning migrations...", bold=True)
902
903 for package, migration_names in prunable_existing.items():
904 for name in sorted(migration_names):
905 click.echo(f" Pruning {package}.{name}...", nl=False)
906 recorder.record_unapplied(package, name)
907 click.echo(" OK")
908
909 for package, migration_names in prunable_orphaned.items():
910 for name in sorted(migration_names):
911 click.echo(f" Pruning {package}.{name} (orphaned)...", nl=False)
912 recorder.record_unapplied(package, name)
913 click.echo(" OK")
914
915 click.secho(
916 f"✓ Removed {total_count} stale migration record{'s' if total_count != 1 else ''}.",
917 fg="green",
918 )
919
920
921@cli.command()
922@click.argument("package_label")
923@click.argument("start_migration_name", required=False)
924@click.argument("migration_name")
925@click.option(
926 "--no-optimize",
927 is_flag=True,
928 help="Do not try to optimize the squashed operations.",
929)
930@click.option(
931 "--noinput",
932 "--no-input",
933 "no_input",
934 is_flag=True,
935 help="Tells Plain to NOT prompt the user for input of any kind.",
936)
937@click.option("--squashed-name", help="Sets the name of the new squashed migration.")
938@click.option(
939 "-v",
940 "--verbosity",
941 type=int,
942 default=1,
943 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
944)
945def squash_migrations(
946 package_label: str,
947 start_migration_name: str | None,
948 migration_name: str,
949 no_optimize: bool,
950 no_input: bool,
951 squashed_name: str | None,
952 verbosity: int,
953) -> None:
954 """
955 Squashes an existing set of migrations (from first until specified) into a single new one.
956 """
957 interactive = not no_input
958
959 def find_migration(
960 loader: MigrationLoader, package_label: str, name: str
961 ) -> Migration:
962 try:
963 return loader.get_migration_by_prefix(package_label, name)
964 except AmbiguityError:
965 raise click.ClickException(
966 f"More than one migration matches '{name}' in package '{package_label}'. Please be more specific."
967 )
968 except KeyError:
969 raise click.ClickException(
970 f"Cannot find a migration matching '{name}' from package '{package_label}'."
971 )
972
973 # Validate package_label
974 try:
975 packages_registry.get_package_config(package_label)
976 except LookupError as err:
977 raise click.ClickException(str(err))
978
979 # Load the current graph state, check the app and migration they asked for exists
980 loader = MigrationLoader(db_connection)
981 if package_label not in loader.migrated_packages:
982 raise click.ClickException(
983 f"Package '{package_label}' does not have migrations (so squashmigrations on it makes no sense)"
984 )
985
986 migration = find_migration(loader, package_label, migration_name)
987
988 # Work out the list of predecessor migrations
989 migrations_to_squash = [
990 loader.get_migration(al, mn)
991 for al, mn in loader.graph.forwards_plan(
992 (migration.package_label, migration.name)
993 )
994 if al == migration.package_label
995 ]
996
997 if start_migration_name:
998 start_migration = find_migration(loader, package_label, start_migration_name)
999 start = loader.get_migration(
1000 start_migration.package_label, start_migration.name
1001 )
1002 try:
1003 start_index = migrations_to_squash.index(start)
1004 migrations_to_squash = migrations_to_squash[start_index:]
1005 except ValueError:
1006 raise click.ClickException(
1007 f"The migration '{start_migration}' cannot be found. Maybe it comes after "
1008 f"the migration '{migration}'?\n"
1009 f"Have a look at:\n"
1010 f" plain models show-migrations {package_label}\n"
1011 f"to debug this issue."
1012 )
1013
1014 # Tell them what we're doing and optionally ask if we should proceed
1015 if verbosity > 0 or interactive:
1016 click.secho("Will squash the following migrations:", fg="cyan", bold=True)
1017 for migration in migrations_to_squash:
1018 click.echo(f" - {migration.name}")
1019
1020 if interactive:
1021 if not click.confirm("Do you wish to proceed?"):
1022 return
1023
1024 # Load the operations from all those migrations and concat together,
1025 # along with collecting external dependencies and detecting double-squashing
1026 operations = []
1027 dependencies = set()
1028 # We need to take all dependencies from the first migration in the list
1029 # as it may be 0002 depending on 0001
1030 first_migration = True
1031 for smigration in migrations_to_squash:
1032 if smigration.replaces:
1033 raise click.ClickException(
1034 "You cannot squash squashed migrations! Please transition it to a "
1035 "normal migration first"
1036 )
1037 operations.extend(smigration.operations)
1038 for dependency in smigration.dependencies:
1039 if isinstance(dependency, SettingsTuple):
1040 dependencies.add(dependency)
1041 elif dependency[0] != smigration.package_label or first_migration:
1042 dependencies.add(dependency)
1043 first_migration = False
1044
1045 if no_optimize:
1046 if verbosity > 0:
1047 click.secho("(Skipping optimization.)", fg="yellow")
1048 new_operations = operations
1049 else:
1050 if verbosity > 0:
1051 click.secho("Optimizing...", fg="cyan")
1052
1053 optimizer = MigrationOptimizer()
1054 new_operations = optimizer.optimize(operations, migration.package_label)
1055
1056 if verbosity > 0:
1057 if len(new_operations) == len(operations):
1058 click.echo(" No optimizations possible.")
1059 else:
1060 click.echo(
1061 f" Optimized from {len(operations)} operations to {len(new_operations)} operations."
1062 )
1063
1064 # Work out the value of replaces (any squashed ones we're re-squashing)
1065 # need to feed their replaces into ours
1066 replaces = []
1067 for migration in migrations_to_squash:
1068 if migration.replaces:
1069 replaces.extend(migration.replaces)
1070 else:
1071 replaces.append((migration.package_label, migration.name))
1072
1073 # Make a new migration with those operations
1074 subclass = type(
1075 "Migration",
1076 (migrations.Migration,),
1077 {
1078 "dependencies": dependencies,
1079 "operations": new_operations,
1080 "replaces": replaces,
1081 },
1082 )
1083 if start_migration_name:
1084 if squashed_name:
1085 # Use the name from --squashed-name
1086 prefix, _ = start_migration.name.split("_", 1)
1087 name = f"{prefix}_{squashed_name}"
1088 else:
1089 # Generate a name
1090 name = f"{start_migration.name}_squashed_{migration.name}"
1091 new_migration = subclass(name, package_label)
1092 else:
1093 name = f"0001_{'squashed_' + migration.name if not squashed_name else squashed_name}"
1094 new_migration = subclass(name, package_label)
1095 new_migration.initial = True
1096
1097 # Write out the new migration file
1098 writer = MigrationWriter(new_migration)
1099 if os.path.exists(writer.path):
1100 raise click.ClickException(
1101 f"Migration {new_migration.name} already exists. Use a different name."
1102 )
1103 with open(writer.path, "w", encoding="utf-8") as fh:
1104 fh.write(writer.as_string())
1105
1106 if verbosity > 0:
1107 click.secho(
1108 f"Created new squashed migration {writer.path}", fg="green", bold=True
1109 )
1110 click.echo(
1111 " You should commit this migration but leave the old ones in place;\n"
1112 " the new migration will be used for new installs. Once you are sure\n"
1113 " all instances of the codebase have applied the migrations you squashed,\n"
1114 " you can delete them."
1115 )
1116 if writer.needs_manual_porting:
1117 click.secho("Manual porting required", fg="yellow", bold=True)
1118 click.echo(
1119 " Your migrations contained functions that must be manually copied over,\n"
1120 " as we could not safely copy their implementation.\n"
1121 " See the comment at the top of the squashed migration for details."
1122 )