1from __future__ import annotations
2
3import os
4import sys
5import time
6from typing import TYPE_CHECKING, Any
7
8import click
9
10from plain.cli import register_cli
11from plain.cli.runtime import common_command
12from plain.packages import packages_registry
13from plain.runtime import settings
14from plain.utils.text import Truncator
15
16from .. import migrations
17from ..backups.core import DatabaseBackups
18from ..db import get_connection
19from ..migrations.autodetector import MigrationAutodetector
20from ..migrations.executor import MigrationExecutor
21from ..migrations.loader import AmbiguityError, MigrationLoader
22from ..migrations.migration import Migration, SettingsTuple
23from ..migrations.optimizer import MigrationOptimizer
24from ..migrations.questioner import (
25 InteractiveMigrationQuestioner,
26 NonInteractiveMigrationQuestioner,
27)
28from ..migrations.recorder import MigrationRecorder
29from ..migrations.state import ModelState, ProjectState
30from ..migrations.writer import MigrationWriter
31from ..registry import models_registry
32
33if TYPE_CHECKING:
34 from ..connection import DatabaseConnection
35 from ..migrations.operations.base import Operation
36
37
38@register_cli("migrations")
39@click.group()
40def cli() -> None:
41 """Database migration management"""
42
43
44@common_command
45@register_cli("makemigrations", shortcut_for="migrations make")
46@cli.command("make")
47@click.argument("package_labels", nargs=-1)
48@click.option(
49 "--dry-run",
50 is_flag=True,
51 help="Just show what migrations would be made; don't actually write them.",
52)
53@click.option("--empty", is_flag=True, help="Create an empty migration.")
54@click.option(
55 "--noinput",
56 "--no-input",
57 "no_input",
58 is_flag=True,
59 help="Tells Plain to NOT prompt the user for input of any kind.",
60)
61@click.option("-n", "--name", help="Use this name for migration file(s).")
62@click.option(
63 "--check",
64 is_flag=True,
65 help="Exit with a non-zero status if model changes are missing migrations and don't actually write them.",
66)
67@click.option(
68 "-v",
69 "--verbosity",
70 type=int,
71 default=1,
72 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
73)
74def make(
75 package_labels: tuple[str, ...],
76 dry_run: bool,
77 empty: bool,
78 no_input: bool,
79 name: str | None,
80 check: bool,
81 verbosity: int,
82) -> None:
83 """Create new database migrations"""
84
85 written_files: list[str] = []
86 interactive = not no_input
87 migration_name = name
88 check_changes = check
89
90 def log(msg: str, level: int = 1) -> None:
91 if verbosity >= level:
92 click.echo(msg)
93
94 def collect_sql_for_migration(
95 migration: Migration,
96 project_state: ProjectState,
97 ) -> tuple[list[str], ProjectState]:
98 """Apply a migration in collect mode and return the SQL statements."""
99 with get_connection().schema_editor(collect_sql=True) as editor:
100 new_state = migration.apply(project_state, editor)
101 return list(editor.executed_sql), new_state
102
103 def write_migration_files(
104 changes: dict[str, list[Migration]],
105 update_previous_migration_paths: dict[str, str] | None = None,
106 ) -> None:
107 """Take a changes dict and write them out as migration files."""
108 directory_created = {}
109 # Track state for SQL collection in dry-run mode.
110 sql_state = loader.project_state() if dry_run else None
111 for package_label, package_migrations in changes.items():
112 log(
113 click.style(f"Migrations for '{package_label}':", fg="cyan", bold=True),
114 level=1,
115 )
116 for migration in package_migrations:
117 writer = MigrationWriter(migration)
118 migration_string = os.path.relpath(writer.path)
119 log(f" {click.style(migration_string, fg='yellow')}\n", level=1)
120 for operation in migration.operations:
121 log(f" - {operation.describe()}", level=1)
122
123 if not dry_run:
124 migrations_directory = os.path.dirname(writer.path)
125 if not directory_created.get(package_label):
126 os.makedirs(migrations_directory, exist_ok=True)
127 init_path = os.path.join(migrations_directory, "__init__.py")
128 if not os.path.isfile(init_path):
129 open(init_path, "w").close()
130 directory_created[package_label] = True
131
132 migration_string = writer.as_string()
133 with open(writer.path, "w", encoding="utf-8") as fh:
134 fh.write(migration_string)
135 written_files.append(writer.path)
136
137 if update_previous_migration_paths:
138 prev_path = update_previous_migration_paths[package_label]
139 if writer.needs_manual_porting:
140 log(
141 click.style(
142 f"Updated migration {migration_string} requires manual porting.\n"
143 f"Previous migration {os.path.relpath(prev_path)} was kept and "
144 f"must be deleted after porting functions manually.",
145 fg="yellow",
146 ),
147 level=1,
148 )
149 else:
150 os.remove(prev_path)
151 log(f"Deleted {os.path.relpath(prev_path)}", level=1)
152 else:
153 # dry_run is True — show SQL preview and optionally the full file.
154 assert sql_state is not None
155 sql_statements, sql_state = collect_sql_for_migration(
156 migration, sql_state
157 )
158 if sql_statements:
159 log("", level=1)
160 log(
161 click.style(" SQL:", fg="green", bold=True),
162 level=1,
163 )
164 for sql in sql_statements:
165 log(f" {sql};", level=1)
166
167 if verbosity >= 3:
168 log(
169 click.style(
170 f"\n Full migrations file '{writer.filename}':",
171 fg="cyan",
172 bold=True,
173 ),
174 level=3,
175 )
176 log(writer.as_string(), level=3)
177
178 # Validate package labels
179 package_labels_set = set(package_labels)
180 has_bad_labels = False
181 for package_label in package_labels_set:
182 try:
183 packages_registry.get_package_config(package_label)
184 except LookupError as err:
185 click.echo(str(err), err=True)
186 has_bad_labels = True
187 if has_bad_labels:
188 sys.exit(2)
189
190 # Load the current graph state
191 loader = MigrationLoader(None, ignore_no_migrations=True)
192
193 # Raise an error if any migrations are applied before their dependencies.
194 loader.check_consistent_history(get_connection())
195
196 # Check for conflicts
197 conflicts = loader.detect_conflicts()
198 if package_labels_set:
199 conflicts = {
200 package_label: conflict
201 for package_label, conflict in conflicts.items()
202 if package_label in package_labels_set
203 }
204
205 if conflicts:
206 name_str = "; ".join(
207 "{} in {}".format(", ".join(names), package)
208 for package, names in conflicts.items()
209 )
210 raise click.ClickException(
211 f"Conflicting migrations detected; multiple leaf nodes in the "
212 f"migration graph: ({name_str})."
213 )
214
215 # Set up questioner
216 if interactive:
217 questioner = InteractiveMigrationQuestioner(
218 specified_packages=package_labels_set,
219 dry_run=dry_run,
220 )
221 else:
222 questioner = NonInteractiveMigrationQuestioner(
223 specified_packages=package_labels_set,
224 dry_run=dry_run,
225 verbosity=verbosity,
226 )
227
228 # Set up autodetector
229 autodetector = MigrationAutodetector(
230 loader.project_state(),
231 ProjectState.from_models_registry(models_registry),
232 questioner,
233 )
234
235 # Handle empty migrations if requested
236 if empty:
237 if not package_labels_set:
238 raise click.ClickException(
239 "You must supply at least one package label when using --empty."
240 )
241 changes = {
242 package: [Migration("custom", package)] for package in package_labels_set
243 }
244 changes = autodetector.arrange_for_graph(
245 changes=changes,
246 graph=loader.graph,
247 migration_name=migration_name,
248 )
249 write_migration_files(changes)
250 return
251
252 # Detect changes
253 changes = autodetector.changes(
254 graph=loader.graph,
255 trim_to_packages=package_labels_set or None,
256 convert_packages=package_labels_set or None,
257 migration_name=migration_name,
258 )
259
260 if not changes:
261 log(
262 "No changes detected"
263 if not package_labels_set
264 else f"No changes detected in {'package' if len(package_labels_set) == 1 else 'packages'} "
265 f"'{', '.join(package_labels_set)}'",
266 level=1,
267 )
268 else:
269 if check_changes:
270 sys.exit(1)
271
272 write_migration_files(changes)
273
274 # Warn about packages that have models but no migrations directory.
275 # These are silently skipped by the autodetector, which can be confusing
276 # when setting up a new app (makemigrations says "No changes detected").
277 unmigrated_with_models = []
278 for package_label in sorted(loader.unmigrated_packages):
279 module_name, _explicit = MigrationLoader.migrations_module(package_label)
280 # Skip packages that explicitly opt out of migrations (module_name is None).
281 if module_name is not None and models_registry.all_models.get(package_label):
282 unmigrated_with_models.append((package_label, module_name))
283 if unmigrated_with_models:
284 click.echo()
285 click.echo(
286 click.style(
287 "Warning: The following packages have models but no migrations directory:",
288 fg="yellow",
289 )
290 )
291 for package_label, module_name in unmigrated_with_models:
292 module_path = module_name.replace(".", "/")
293 click.echo(
294 f" - {package_label} (create {module_path}/ to enable migrations)"
295 )
296 click.echo()
297 click.echo(
298 "To create initial migrations, add the directory and run "
299 + click.style("plain makemigrations", bold=True)
300 + " again."
301 )
302
303
304@common_command
305@register_cli("migrate", shortcut_for="migrations apply")
306@cli.command("apply")
307@click.argument("package_label", required=False)
308@click.argument("migration_name", required=False)
309@click.option(
310 "--fake", is_flag=True, help="Mark migrations as run without actually running them."
311)
312@click.option(
313 "--plan",
314 is_flag=True,
315 help="Shows a list of the migration actions that will be performed.",
316)
317@click.option(
318 "--check",
319 "check_unapplied",
320 is_flag=True,
321 help="Exits with a non-zero status if unapplied migrations exist and does not actually apply migrations.",
322)
323@click.option(
324 "--backup/--no-backup",
325 "backup",
326 is_flag=True,
327 default=None,
328 help="Explicitly enable/disable pre-migration backups.",
329)
330@click.option(
331 "--no-input",
332 "--noinput",
333 "no_input",
334 is_flag=True,
335 help="Tells Plain to NOT prompt the user for input of any kind.",
336)
337@click.option(
338 "--atomic-batch/--no-atomic-batch",
339 default=None,
340 help="Run migrations in a single transaction (auto-detected by default)",
341)
342@click.option(
343 "--quiet",
344 is_flag=True,
345 help="Suppress migration output (used for test database creation).",
346)
347def apply(
348 package_label: str | None,
349 migration_name: str | None,
350 fake: bool,
351 plan: bool,
352 check_unapplied: bool,
353 backup: bool | None,
354 no_input: bool,
355 atomic_batch: bool | None,
356 quiet: bool,
357) -> None:
358 """Apply database migrations"""
359
360 def migration_progress_callback(
361 action: str,
362 *,
363 migration: Migration | None = None,
364 fake: bool = False,
365 operation: Operation | None = None,
366 sql_statements: list[str] | None = None,
367 ) -> None:
368 if quiet:
369 return
370
371 if action == "apply_start":
372 click.echo() # Always add newline between migrations
373 if fake:
374 click.secho(f"{migration} (faked)", fg="cyan")
375 else:
376 click.secho(f"{migration}", fg="cyan")
377 elif action == "apply_success":
378 pass # Already shown via operations
379 elif action == "operation_start":
380 if operation is not None:
381 click.echo(f" {operation.describe()}", nl=False)
382 click.secho("... ", dim=True, nl=False)
383 elif action == "operation_success":
384 # Show SQL statements (no OK needed, SQL implies success)
385 if sql_statements:
386 click.echo() # newline after "..."
387 for sql in sql_statements:
388 click.secho(f" {sql}", dim=True)
389 else:
390 # No SQL: just add a newline
391 click.echo()
392
393 def describe_operation(operation: Any) -> tuple[str, bool]:
394 """Return a string that describes a migration operation for --plan."""
395 prefix = ""
396 is_error = False
397 if hasattr(operation, "code"):
398 code = operation.code
399 action = (code.__doc__ or "") if code else None
400 elif hasattr(operation, "sql"):
401 action = operation.sql
402 else:
403 action = ""
404 if action is not None:
405 action = str(action).replace("\n", "")
406 if action:
407 action = " -> " + action
408 truncated = Truncator(action)
409 return prefix + operation.describe() + truncated.chars(40), is_error
410
411 # Work out which packages have migrations and which do not
412 executor = MigrationExecutor(get_connection(), migration_progress_callback)
413
414 # Raise an error if any migrations are applied before their dependencies.
415 executor.loader.check_consistent_history(executor.connection)
416
417 # Before anything else, see if there's conflicting packages and drop out
418 # hard if there are any
419 conflicts = executor.loader.detect_conflicts()
420 if conflicts:
421 name_str = "; ".join(
422 "{} in {}".format(", ".join(names), package)
423 for package, names in conflicts.items()
424 )
425 raise click.ClickException(
426 "Conflicting migrations detected; multiple leaf nodes in the "
427 f"migration graph: ({name_str})."
428 )
429
430 # If they supplied command line arguments, work out what they mean.
431 target_package_labels_only = True
432 targets: list[tuple[str, str]]
433 if package_label:
434 try:
435 packages_registry.get_package_config(package_label)
436 except LookupError as err:
437 raise click.ClickException(str(err))
438
439 if package_label not in executor.loader.migrated_packages:
440 raise click.ClickException(
441 f"Package '{package_label}' does not have migrations."
442 )
443
444 if package_label and migration_name:
445 try:
446 migration = executor.loader.get_migration_by_prefix(
447 package_label, migration_name
448 )
449 except AmbiguityError:
450 raise click.ClickException(
451 f"More than one migration matches '{migration_name}' in package '{package_label}'. "
452 "Please be more specific."
453 )
454 except KeyError:
455 raise click.ClickException(
456 f"Cannot find a migration matching '{migration_name}' from package '{package_label}'."
457 )
458 target: tuple[str, str] = (package_label, migration.name)
459 if (
460 target not in executor.loader.graph.nodes
461 and target in executor.loader.replacements
462 ):
463 incomplete_migration = executor.loader.replacements[target]
464 target = incomplete_migration.replaces[-1]
465 targets = [target]
466 target_package_labels_only = False
467 elif package_label:
468 targets = [
469 key for key in executor.loader.graph.leaf_nodes() if key[0] == package_label
470 ]
471 else:
472 targets = list(executor.loader.graph.leaf_nodes())
473
474 migration_plan = executor.migration_plan(targets)
475
476 if plan:
477 if not quiet:
478 click.secho("Planned operations:", fg="cyan")
479 if not migration_plan:
480 click.echo(" No planned migration operations.")
481 else:
482 for migration in migration_plan:
483 click.secho(str(migration), fg="cyan")
484 for operation in migration.operations:
485 message, is_error = describe_operation(operation)
486 if is_error:
487 click.secho(" " + message, fg="yellow")
488 else:
489 click.echo(" " + message)
490 if check_unapplied:
491 sys.exit(1)
492 return
493
494 if check_unapplied:
495 if migration_plan:
496 sys.exit(1)
497 return
498
499 # Print some useful info
500 if not quiet:
501 if not target_package_labels_only:
502 click.secho("Target: ", bold=True, nl=False)
503 click.secho(f"{targets[0][1]} from {targets[0][0]}", dim=True)
504 click.echo() # Add newline after target
505 elif package_label:
506 # Only show package name when explicitly targeting a single package
507 click.secho("Package: ", bold=True, nl=False)
508 click.secho(package_label, dim=True)
509 click.echo() # Add newline after package
510
511 pre_migrate_state = executor._create_project_state(with_applied_migrations=True)
512
513 if migration_plan:
514 # Determine whether to use atomic batch
515 use_atomic_batch = False
516 atomic_batch_message = None
517 if len(migration_plan) > 1:
518 # Check if all migrations support atomic
519 non_atomic_migrations = [m for m in migration_plan if not m.atomic]
520
521 if atomic_batch is True:
522 # User explicitly requested atomic batch
523 if non_atomic_migrations:
524 names = ", ".join(
525 f"{m.package_label}.{m.name}" for m in non_atomic_migrations[:3]
526 )
527 if len(non_atomic_migrations) > 3:
528 names += f", and {len(non_atomic_migrations) - 3} more"
529 raise click.UsageError(
530 f"--atomic-batch requested but these migrations have atomic=False: {names}"
531 )
532 use_atomic_batch = True
533 atomic_batch_message = (
534 f"Running {len(migration_plan)} migrations in atomic batch"
535 )
536 elif atomic_batch is False:
537 # User explicitly disabled atomic batch
538 use_atomic_batch = False
539 if len(migration_plan) > 1:
540 atomic_batch_message = (
541 f"Running {len(migration_plan)} migrations separately"
542 )
543 else:
544 # Auto-detect (atomic_batch is None)
545 # Use atomic batch by default
546 if not non_atomic_migrations:
547 use_atomic_batch = True
548 atomic_batch_message = (
549 f"Running {len(migration_plan)} migrations in atomic batch"
550 )
551 else:
552 use_atomic_batch = False
553 if len(migration_plan) > 1:
554 atomic_batch_message = f"Running {len(migration_plan)} migrations separately (some have atomic=False)"
555
556 if backup or (backup is None and settings.DEBUG):
557 backup_name = time.strftime("%Y%m%d_%H%M%S")
558 if not quiet:
559 click.secho("Creating backup: ", bold=True, nl=False)
560 click.secho(f"{backup_name}", dim=True, nl=False)
561 click.secho("... ", dim=True, nl=False)
562
563 backups_handler = DatabaseBackups()
564 backups_handler.create(
565 backup_name,
566 source="migrate",
567 pg_dump=os.environ.get("PG_DUMP", "pg_dump"),
568 )
569
570 if not quiet:
571 click.echo(click.style("OK", fg="green"))
572 click.echo() # Add blank line after backup output
573 else:
574 if not quiet:
575 click.echo() # Add blank line after packages/target info
576
577 if not quiet:
578 if atomic_batch_message:
579 click.secho(
580 f"Applying migrations ({atomic_batch_message.lower()}):", bold=True
581 )
582 else:
583 click.secho("Applying migrations:", bold=True)
584 post_migrate_state = executor.migrate(
585 targets,
586 plan=migration_plan,
587 state=pre_migrate_state.clone(),
588 fake=fake,
589 atomic_batch=use_atomic_batch,
590 )
591 # post_migrate signals have access to all models. Ensure that all models
592 # are reloaded in case any are delayed.
593 post_migrate_state.clear_delayed_models_cache()
594 post_migrate_packages = post_migrate_state.models_registry
595
596 # Re-render models of real packages to include relationships now that
597 # we've got a final state. This wouldn't be necessary if real packages
598 # models were rendered with relationships in the first place.
599 with post_migrate_packages.bulk_update():
600 model_keys = []
601 for model_state in post_migrate_packages.real_models:
602 model_key = model_state.package_label, model_state.name_lower
603 model_keys.append(model_key)
604 post_migrate_packages.unregister_model(*model_key)
605 post_migrate_packages.render_multiple(
606 [
607 ModelState.from_model(models_registry.get_model(*model))
608 for model in model_keys
609 ]
610 )
611
612 else:
613 if not quiet:
614 click.echo("No migrations to apply.")
615 # If there's changes that aren't in migrations yet, tell them
616 # how to fix it.
617 autodetector = MigrationAutodetector(
618 executor.loader.project_state(),
619 ProjectState.from_models_registry(models_registry),
620 )
621 changes = autodetector.changes(graph=executor.loader.graph)
622 if changes:
623 packages = ", ".join(sorted(changes))
624 click.echo(
625 f"Your models have changes that are not yet reflected in migrations ({packages})."
626 )
627 click.echo(
628 "Run 'plain makemigrations' to create migrations for these changes."
629 )
630
631
632@cli.command("list")
633@click.argument("package_labels", nargs=-1)
634@click.option(
635 "--format",
636 type=click.Choice(["list", "plan"]),
637 default="list",
638 help="Output format.",
639)
640@click.option(
641 "-v",
642 "--verbosity",
643 type=int,
644 default=1,
645 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
646)
647def list_migrations(
648 package_labels: tuple[str, ...], format: str, verbosity: int
649) -> None:
650 """Show all migrations"""
651
652 def _validate_package_names(package_names: tuple[str, ...]) -> None:
653 has_bad_names = False
654 for package_name in package_names:
655 try:
656 packages_registry.get_package_config(package_name)
657 except LookupError as err:
658 click.echo(str(err), err=True)
659 has_bad_names = True
660 if has_bad_names:
661 sys.exit(2)
662
663 def show_list(
664 connection: DatabaseConnection, package_names: tuple[str, ...]
665 ) -> None:
666 """
667 Show a list of all migrations on the system, or only those of
668 some named packages.
669 """
670 # Load migrations from disk/DB
671 loader = MigrationLoader(connection, ignore_no_migrations=True)
672 recorder = MigrationRecorder(connection)
673 recorded_migrations = recorder.applied_migrations()
674
675 graph = loader.graph
676 # If we were passed a list of packages, validate it
677 package_names_list: list[str]
678 if package_names:
679 _validate_package_names(package_names)
680 package_names_list = list(package_names)
681 # Otherwise, show all packages in alphabetic order
682 else:
683 package_names_list = sorted(loader.migrated_packages)
684 # For each app, print its migrations in order from oldest (roots) to
685 # newest (leaves).
686 for package_name in package_names_list:
687 click.secho(package_name, fg="cyan", bold=True)
688 shown = set()
689 for node in graph.leaf_nodes(package_name):
690 for plan_node in graph.forwards_plan(node):
691 if plan_node not in shown and plan_node[0] == package_name:
692 # Give it a nice title if it's a squashed one
693 title = plan_node[1]
694 migration_node = graph.nodes[plan_node]
695 if migration_node and migration_node.replaces:
696 title += (
697 f" ({len(migration_node.replaces)} squashed migrations)"
698 )
699 applied_migration = (
700 loader.applied_migrations.get(plan_node)
701 if loader.applied_migrations
702 else None
703 )
704 # Mark it as applied/unapplied
705 if applied_migration:
706 if plan_node in recorded_migrations:
707 output = f" [X] {title}"
708 else:
709 title += " Run `plain migrate` to finish recording."
710 output = f" [-] {title}"
711 if verbosity >= 2 and hasattr(applied_migration, "applied"):
712 output += f" (applied at {applied_migration.applied.strftime('%Y-%m-%d %H:%M:%S')})"
713 click.echo(output)
714 else:
715 click.echo(f" [ ] {title}")
716 shown.add(plan_node)
717 # If we didn't print anything, then a small message
718 if not shown:
719 click.secho(" (no migrations)", fg="red")
720
721 def show_plan(
722 connection: DatabaseConnection, package_names: tuple[str, ...]
723 ) -> None:
724 """
725 Show all known migrations (or only those of the specified package_names)
726 in the order they will be applied.
727 """
728 # Load migrations from disk/DB
729 loader = MigrationLoader(connection)
730 assert loader.applied_migrations is not None
731 graph = loader.graph
732 if package_names:
733 _validate_package_names(package_names)
734 targets = [key for key in graph.leaf_nodes() if key[0] in package_names]
735 else:
736 targets = graph.leaf_nodes()
737 plan = []
738 seen = set()
739
740 # Generate the plan
741 for target in targets:
742 for migration in graph.forwards_plan(target):
743 if migration not in seen:
744 node = graph.node_map[migration]
745 plan.append(node)
746 seen.add(migration)
747
748 # Output
749 def print_deps(node: Any) -> str:
750 out = []
751 for parent in sorted(node.parents):
752 out.append(f"{parent.key[0]}.{parent.key[1]}")
753 if out:
754 return f" ... ({', '.join(out)})"
755 return ""
756
757 for node in plan:
758 deps = ""
759 if verbosity >= 2:
760 deps = print_deps(node)
761 if node.key in loader.applied_migrations:
762 click.echo(f"[X] {node.key[0]}.{node.key[1]}{deps}")
763 else:
764 click.echo(f"[ ] {node.key[0]}.{node.key[1]}{deps}")
765 if not plan:
766 click.secho("(no migrations)", fg="red")
767
768 # Get the database we're operating from
769
770 conn = get_connection()
771 if format == "plan":
772 show_plan(conn, package_labels)
773 else:
774 show_list(conn, package_labels)
775
776
777@cli.command("prune")
778@click.option(
779 "--yes",
780 is_flag=True,
781 help="Skip confirmation prompt (for non-interactive use).",
782)
783def prune(yes: bool) -> None:
784 """Remove stale migration records from the database"""
785 # Load migrations from disk and database
786 conn = get_connection()
787 loader = MigrationLoader(conn, ignore_no_migrations=True)
788 assert loader.disk_migrations is not None
789 recorder = MigrationRecorder(conn)
790 recorded_migrations = recorder.applied_migrations()
791
792 # Find all prunable migrations (recorded but not on disk)
793 all_prunable = [
794 migration
795 for migration in recorded_migrations
796 if migration not in loader.disk_migrations
797 ]
798
799 if not all_prunable:
800 click.echo("No stale migration records found.")
801 return
802
803 # Separate into existing packages vs orphaned packages
804 existing_packages = set(loader.migrated_packages)
805 prunable_existing: dict[str, list[str]] = {}
806 prunable_orphaned: dict[str, list[str]] = {}
807
808 for migration in all_prunable:
809 package, name = migration
810 if package in existing_packages:
811 if package not in prunable_existing:
812 prunable_existing[package] = []
813 prunable_existing[package].append(name)
814 else:
815 if package not in prunable_orphaned:
816 prunable_orphaned[package] = []
817 prunable_orphaned[package].append(name)
818
819 # Display what was found
820 if prunable_existing:
821 click.secho(
822 "Stale migration records (from existing packages):",
823 fg="yellow",
824 bold=True,
825 )
826 for package in sorted(prunable_existing.keys()):
827 click.secho(f" {package}:", fg="yellow")
828 for name in sorted(prunable_existing[package]):
829 click.echo(f" - {name}")
830 click.echo()
831
832 if prunable_orphaned:
833 click.secho(
834 "Orphaned migration records (from removed packages):",
835 fg="red",
836 bold=True,
837 )
838 for package in sorted(prunable_orphaned.keys()):
839 click.secho(f" {package}:", fg="red")
840 for name in sorted(prunable_orphaned[package]):
841 click.echo(f" - {name}")
842 click.echo()
843
844 total_count = sum(len(migs) for migs in prunable_existing.values()) + sum(
845 len(migs) for migs in prunable_orphaned.values()
846 )
847
848 if not yes:
849 click.echo(
850 f"Found {total_count} stale migration record{'s' if total_count != 1 else ''}."
851 )
852 click.echo()
853
854 # Prompt for confirmation if interactive
855 if not click.confirm(
856 "Do you want to remove these migrations from the database?"
857 ):
858 return
859
860 # Actually prune the migrations
861 click.secho("Pruning migrations...", bold=True)
862
863 for package, migration_names in prunable_existing.items():
864 for name in sorted(migration_names):
865 click.echo(f" Pruning {package}.{name}...", nl=False)
866 recorder.record_unapplied(package, name)
867 click.echo(" OK")
868
869 for package, migration_names in prunable_orphaned.items():
870 for name in sorted(migration_names):
871 click.echo(f" Pruning {package}.{name} (orphaned)...", nl=False)
872 recorder.record_unapplied(package, name)
873 click.echo(" OK")
874
875 click.secho(
876 f"✓ Removed {total_count} stale migration record{'s' if total_count != 1 else ''}.",
877 fg="green",
878 )
879
880
881@cli.command("squash")
882@click.argument("package_label")
883@click.argument("start_migration_name", required=False)
884@click.argument("migration_name")
885@click.option(
886 "--no-optimize",
887 is_flag=True,
888 help="Do not try to optimize the squashed operations.",
889)
890@click.option(
891 "--noinput",
892 "--no-input",
893 "no_input",
894 is_flag=True,
895 help="Tells Plain to NOT prompt the user for input of any kind.",
896)
897@click.option("--squashed-name", help="Sets the name of the new squashed migration.")
898@click.option(
899 "-v",
900 "--verbosity",
901 type=int,
902 default=1,
903 help="Verbosity level; 0=minimal output, 1=normal output, 2=verbose output, 3=very verbose output",
904)
905def squash(
906 package_label: str,
907 start_migration_name: str | None,
908 migration_name: str,
909 no_optimize: bool,
910 no_input: bool,
911 squashed_name: str | None,
912 verbosity: int,
913) -> None:
914 """Squash multiple migrations into one"""
915 interactive = not no_input
916
917 def find_migration(
918 loader: MigrationLoader, package_label: str, name: str
919 ) -> Migration:
920 try:
921 return loader.get_migration_by_prefix(package_label, name)
922 except AmbiguityError:
923 raise click.ClickException(
924 f"More than one migration matches '{name}' in package '{package_label}'. Please be more specific."
925 )
926 except KeyError:
927 raise click.ClickException(
928 f"Cannot find a migration matching '{name}' from package '{package_label}'."
929 )
930
931 # Validate package_label
932 try:
933 packages_registry.get_package_config(package_label)
934 except LookupError as err:
935 raise click.ClickException(str(err))
936
937 # Load the current graph state, check the app and migration they asked for exists
938 loader = MigrationLoader(get_connection())
939 if package_label not in loader.migrated_packages:
940 raise click.ClickException(
941 f"Package '{package_label}' does not have migrations (so squashmigrations on it makes no sense)"
942 )
943
944 migration = find_migration(loader, package_label, migration_name)
945
946 # Work out the list of predecessor migrations
947 migrations_to_squash: list[Migration] = []
948 for al, mn in loader.graph.forwards_plan((migration.package_label, migration.name)):
949 if al != migration.package_label:
950 continue
951 candidate = loader.get_migration(al, mn)
952 if candidate is None:
953 raise click.ClickException(f"Migration {mn} in package {al} is missing")
954 migrations_to_squash.append(candidate)
955
956 if start_migration_name:
957 start_migration = find_migration(loader, package_label, start_migration_name)
958 start = loader.get_migration(
959 start_migration.package_label, start_migration.name
960 )
961 if start is None:
962 raise click.ClickException(
963 f"Cannot find migration '{start_migration.name}' in package '{package_label}'."
964 )
965 try:
966 start_index = migrations_to_squash.index(start)
967 migrations_to_squash = migrations_to_squash[start_index:]
968 except ValueError:
969 raise click.ClickException(
970 f"The migration '{start_migration}' cannot be found. Maybe it comes after "
971 f"the migration '{migration}'?\n"
972 f"Have a look at:\n"
973 f" plain migrations list {package_label}\n"
974 f"to debug this issue."
975 )
976
977 # Tell them what we're doing and optionally ask if we should proceed
978 if verbosity > 0 or interactive:
979 click.secho("Will squash the following migrations:", fg="cyan", bold=True)
980 for migration in migrations_to_squash:
981 click.echo(f" - {migration.name}")
982
983 if interactive:
984 if not click.confirm("Do you wish to proceed?"):
985 return
986
987 # Load the operations from all those migrations and concat together,
988 # along with collecting external dependencies and detecting double-squashing
989 operations = []
990 dependencies = set()
991 # We need to take all dependencies from the first migration in the list
992 # as it may be 0002 depending on 0001
993 first_migration = True
994 for smigration in migrations_to_squash:
995 if smigration.replaces:
996 raise click.ClickException(
997 "You cannot squash squashed migrations! Please transition it to a "
998 "normal migration first"
999 )
1000 operations.extend(smigration.operations)
1001 for dependency in smigration.dependencies:
1002 if isinstance(dependency, SettingsTuple):
1003 dependencies.add(dependency)
1004 elif dependency[0] != smigration.package_label or first_migration:
1005 dependencies.add(dependency)
1006 first_migration = False
1007
1008 if no_optimize:
1009 if verbosity > 0:
1010 click.secho("(Skipping optimization.)", fg="yellow")
1011 new_operations = operations
1012 else:
1013 if verbosity > 0:
1014 click.secho("Optimizing...", fg="cyan")
1015
1016 optimizer = MigrationOptimizer()
1017 new_operations = optimizer.optimize(operations, migration.package_label)
1018
1019 if verbosity > 0:
1020 if len(new_operations) == len(operations):
1021 click.echo(" No optimizations possible.")
1022 else:
1023 click.echo(
1024 f" Optimized from {len(operations)} operations to {len(new_operations)} operations."
1025 )
1026
1027 # Work out the value of replaces (any squashed ones we're re-squashing)
1028 # need to feed their replaces into ours
1029 replaces: list[tuple[str, str]] = []
1030 for migration in migrations_to_squash:
1031 if migration.replaces:
1032 replaces.extend(migration.replaces)
1033 else:
1034 replaces.append((migration.package_label, migration.name))
1035
1036 # Make a new migration with those operations
1037 subclass = type(
1038 "Migration",
1039 (migrations.Migration,),
1040 {
1041 "dependencies": dependencies,
1042 "operations": new_operations,
1043 "replaces": replaces,
1044 },
1045 )
1046 if start_migration_name:
1047 if squashed_name:
1048 # Use the name from --squashed-name
1049 prefix, _ = start_migration.name.split("_", 1)
1050 name = f"{prefix}_{squashed_name}"
1051 else:
1052 # Generate a name
1053 name = f"{start_migration.name}_squashed_{migration.name}"
1054 new_migration = subclass(name, package_label)
1055 else:
1056 name = f"0001_{'squashed_' + migration.name if not squashed_name else squashed_name}"
1057 new_migration = subclass(name, package_label)
1058 new_migration.initial = True
1059
1060 # Write out the new migration file
1061 writer = MigrationWriter(new_migration)
1062 if os.path.exists(writer.path):
1063 raise click.ClickException(
1064 f"Migration {new_migration.name} already exists. Use a different name."
1065 )
1066 with open(writer.path, "w", encoding="utf-8") as fh:
1067 fh.write(writer.as_string())
1068
1069 if verbosity > 0:
1070 click.secho(
1071 f"Created new squashed migration {writer.path}", fg="green", bold=True
1072 )
1073 click.echo(
1074 " You should commit this migration but leave the old ones in place;\n"
1075 " the new migration will be used for new installs. Once you are sure\n"
1076 " all instances of the codebase have applied the migrations you squashed,\n"
1077 " you can delete them."
1078 )
1079 if writer.needs_manual_porting:
1080 click.secho("Manual porting required", fg="yellow", bold=True)
1081 click.echo(
1082 " Your migrations contained functions that must be manually copied over,\n"
1083 " as we could not safely copy their implementation.\n"
1084 " See the comment at the top of the squashed migration for details."
1085 )