v0.150.0
   1from __future__ import annotations
   2
   3import json
   4import re
   5from collections.abc import Iterator
   6from contextlib import contextmanager
   7from dataclasses import dataclass, field
   8from enum import StrEnum
   9from functools import cached_property
  10from typing import TYPE_CHECKING, Any
  11
  12import psycopg
  13
  14from ..constraints import CheckConstraint, UniqueConstraint
  15from ..ddl import (
  16    build_include_sql,
  17    compile_database_default_sql,
  18    compile_expression_sql,
  19    compile_index_expressions_sql,
  20    compile_literal_default_sql,
  21    deferrable_sql,
  22)
  23from ..deletion import sql_on_delete
  24from ..dialect import quote_name
  25from ..fields.base import ColumnField
  26from ..fields.related import ForeignKeyField
  27from ..indexes import Index
  28from ..introspection import (
  29    MANAGED_CONSTRAINT_TYPES,
  30    MANAGED_INDEX_ACCESS_METHODS,
  31    ColumnState,
  32    ConstraintState,
  33    ConType,
  34    TableState,
  35    introspect_table,
  36)
  37
  38if TYPE_CHECKING:
  39    from ..base import Model
  40    from ..connection import DatabaseConnection
  41    from ..expressions import Expression, ReplaceableExpression
  42    from ..fields import Field
  43    from ..query_utils import Q
  44    from ..utils import CursorWrapper
  45
  46
  47# Drift types — semantic descriptions of schema differences
  48
  49
  50class DriftKind(StrEnum):
  51    MISSING = "missing"
  52    INVALID = "invalid"
  53    CHANGED = "changed"
  54    RENAMED = "renamed"
  55    UNDECLARED = "undeclared"
  56    UNVALIDATED = "unvalidated"
  57
  58
  59@dataclass
  60class IndexDrift:
  61    """A schema difference for an index."""
  62
  63    kind: DriftKind
  64    table: str
  65    index: Index | None = None
  66    model: type[Model] | None = None
  67    old_name: str | None = None
  68    new_name: str | None = None
  69    name: str | None = None
  70
  71    def describe(self) -> str:
  72        match self.kind:
  73            case DriftKind.MISSING:
  74                assert self.index is not None
  75                return f"{self.table}: index {self.index.name} missing"
  76            case DriftKind.INVALID:
  77                assert self.index is not None
  78                return f"{self.table}: index {self.index.name} INVALID"
  79            case DriftKind.CHANGED:
  80                assert self.index is not None
  81                return f"{self.table}: index {self.index.name} definition changed"
  82            case DriftKind.RENAMED:
  83                return f"{self.table}: index {self.old_name}{self.new_name}"
  84            case _:
  85                return f"{self.table}: index {self.name} not declared"
  86
  87
  88@dataclass
  89class ConstraintDrift:
  90    """A schema difference for a constraint."""
  91
  92    kind: DriftKind
  93    table: str
  94    constraint: CheckConstraint | UniqueConstraint | None = None
  95    model: type[Model] | None = None
  96    old_name: str | None = None
  97    new_name: str | None = None
  98    name: str | None = None
  99
 100    def describe(self) -> str:
 101        match self.kind:
 102            case DriftKind.MISSING:
 103                assert self.constraint is not None
 104                return f"{self.table}: constraint {self.constraint.name} missing"
 105            case DriftKind.UNVALIDATED:
 106                return f"{self.table}: constraint {self.name} NOT VALID"
 107            case DriftKind.CHANGED:
 108                assert self.constraint is not None
 109                return f"{self.table}: constraint {self.constraint.name} definition changed"
 110            case DriftKind.RENAMED:
 111                return f"{self.table}: constraint {self.old_name}{self.new_name}"
 112            case _:
 113                return f"{self.table}: constraint {self.name} not declared"
 114
 115
 116@dataclass
 117class ForeignKeyDrift:
 118    """A schema difference for a foreign key constraint."""
 119
 120    kind: DriftKind
 121    table: str
 122    name: str | None = None
 123    column: str | None = None
 124    target_table: str | None = None
 125    target_column: str | None = None
 126    on_delete_clause: str = ""  # SQL clause to emit, e.g. " ON DELETE CASCADE"
 127    actual_action: str | None = None  # CHANGED only: current DB confdeltype
 128    expected_action: str | None = None  # CHANGED only: expected confdeltype
 129
 130    def describe(self) -> str:
 131        match self.kind:
 132            case DriftKind.MISSING:
 133                return f"{self.table}: FK {self.name} missing ({self.column}{self.target_table}.{self.target_column})"
 134            case DriftKind.UNVALIDATED:
 135                return f"{self.table}: FK {self.name} NOT VALID"
 136            case DriftKind.CHANGED:
 137                return (
 138                    f"{self.table}: FK {self.name} on_delete changed "
 139                    f"({self.actual_action!r}{self.expected_action!r})"
 140                )
 141            case _:
 142                return f"{self.table}: FK {self.name} not declared"
 143
 144
 145@dataclass
 146class NullabilityDrift:
 147    """Mismatch between model and DB column nullability."""
 148
 149    table: str
 150    column: str
 151    model_allows_null: bool
 152    has_null_rows: bool = False  # Only checked when model_allows_null is False
 153
 154    def describe(self) -> str:
 155        if not self.model_allows_null:
 156            if self.has_null_rows:
 157                return (
 158                    f"{self.table}: column {self.column} allows NULL (NULL rows exist)"
 159                )
 160            return f"{self.table}: column {self.column} allows NULL"
 161        return f"{self.table}: column {self.column} is NOT NULL, model allows NULL"
 162
 163
 164@dataclass
 165class ColumnDefaultDrift:
 166    """Mismatch between the model's declared default and the DB column DEFAULT."""
 167
 168    kind: DriftKind
 169    table: str
 170    column: str
 171    db_default_sql: str | None
 172    model_default_sql: str | None
 173
 174    def describe(self) -> str:
 175        match self.kind:
 176            case DriftKind.MISSING:
 177                return (
 178                    f"{self.table}: column {self.column} missing DEFAULT "
 179                    f"(expected {self.model_default_sql})"
 180                )
 181            case DriftKind.CHANGED:
 182                return (
 183                    f"{self.table}: column {self.column} DEFAULT mismatch — "
 184                    f"db has {self.db_default_sql}, model declares "
 185                    f"{self.model_default_sql}"
 186                )
 187            case _:
 188                return (
 189                    f"{self.table}: column {self.column} has undeclared DEFAULT "
 190                    f"{self.db_default_sql}"
 191                )
 192
 193
 194@dataclass
 195class StorageParameterDrift:
 196    """Mismatch between declared and live `pg_class.reloptions` for a table.
 197
 198    `key` carries a `toast.` prefix when the parameter belongs to the table's
 199    TOAST relation; convergence emits and reads it accordingly.
 200    """
 201
 202    kind: DriftKind
 203    table: str
 204    key: str
 205    declared_value: str | None = None
 206    actual_value: str | None = None
 207
 208    def describe(self) -> str:
 209        match self.kind:
 210            case DriftKind.MISSING:
 211                return (
 212                    f"{self.table}: storage parameter {self.key} missing "
 213                    f"(expected {self.declared_value})"
 214                )
 215            case DriftKind.CHANGED:
 216                return (
 217                    f"{self.table}: storage parameter {self.key} mismatch — "
 218                    f"db has {self.actual_value}, model declares "
 219                    f"{self.declared_value}"
 220                )
 221            case _:
 222                return (
 223                    f"{self.table}: storage parameter {self.key} not declared "
 224                    f"(db has {self.actual_value})"
 225                )
 226
 227
 228type ColumnDrift = NullabilityDrift | ColumnDefaultDrift
 229type Drift = (
 230    IndexDrift | ConstraintDrift | ForeignKeyDrift | ColumnDrift | StorageParameterDrift
 231)
 232
 233
 234# Status objects — analysis results with optional drift
 235
 236
 237@dataclass
 238class ColumnStatus:
 239    name: str
 240    field_name: str
 241    type: str
 242    nullable: bool
 243    primary_key: bool
 244    pk_suffix: str
 245    issue: str | None = None
 246    drifts: list[ColumnDrift] = field(default_factory=list)
 247
 248
 249@dataclass
 250class IndexStatus:
 251    name: str
 252    fields: list[str] = field(default_factory=list)
 253    issue: str | None = None
 254    drift: IndexDrift | None = None
 255    access_method: str | None = None  # set for unmanaged indexes (display only)
 256
 257
 258@dataclass
 259class ConstraintStatus:
 260    name: str
 261    constraint_type: ConType
 262    fields: list[str] = field(default_factory=list)
 263    issue: str | None = None
 264    drift: ConstraintDrift | ForeignKeyDrift | IndexDrift | None = None
 265
 266
 267@dataclass
 268class ModelAnalysis:
 269    label: str
 270    table: str
 271    table_issues: list[str] = field(default_factory=list)
 272    columns: list[ColumnStatus] = field(default_factory=list)
 273    indexes: list[IndexStatus] = field(default_factory=list)
 274    constraints: list[ConstraintStatus] = field(default_factory=list)
 275    storage_parameter_drifts: list[StorageParameterDrift] = field(default_factory=list)
 276
 277    @cached_property
 278    def drifts(self) -> list[Drift]:
 279        """All detected schema drifts."""
 280        result: list[Drift] = []
 281        for col in self.columns:
 282            result.extend(col.drifts)
 283        for idx in self.indexes:
 284            if idx.drift:
 285                result.append(idx.drift)
 286        for con in self.constraints:
 287            if con.drift:
 288                result.append(con.drift)
 289        result.extend(self.storage_parameter_drifts)
 290        return result
 291
 292    @cached_property
 293    def issue_count(self) -> int:
 294        """Total issues (table + columns + indexes + constraints + storage)."""
 295        count = len(self.table_issues)
 296        count += sum(1 for col in self.columns if col.issue)
 297        count += sum(1 for idx in self.indexes if idx.issue)
 298        count += sum(1 for con in self.constraints if con.issue)
 299        count += len(self.storage_parameter_drifts)
 300        return count
 301
 302    def to_dict(self) -> dict[str, Any]:
 303        """Serialize for --json output."""
 304        return {
 305            "label": self.label,
 306            "table": self.table,
 307            "table_issues": self.table_issues,
 308            "columns": [
 309                {
 310                    "name": col.name,
 311                    "field_name": col.field_name,
 312                    "type": col.type,
 313                    "nullable": col.nullable,
 314                    "primary_key": col.primary_key,
 315                    "pk_suffix": col.pk_suffix,
 316                    "issue": col.issue,
 317                    "drifts": [d.describe() for d in col.drifts],
 318                }
 319                for col in self.columns
 320            ],
 321            "indexes": [
 322                {
 323                    "name": idx.name,
 324                    "fields": idx.fields,
 325                    "access_method": idx.access_method,
 326                    "issue": idx.issue,
 327                    "drift": idx.drift.describe() if idx.drift else None,
 328                }
 329                for idx in self.indexes
 330            ],
 331            "constraints": [
 332                {
 333                    "name": con.name,
 334                    "constraint_type": con.constraint_type,
 335                    "type_label": con.constraint_type.label,
 336                    "fields": con.fields,
 337                    "issue": con.issue,
 338                    "drift": con.drift.describe() if con.drift else None,
 339                }
 340                for con in self.constraints
 341            ],
 342            "storage_parameter_drifts": [
 343                {
 344                    "key": d.key,
 345                    "kind": d.kind,
 346                    "declared_value": d.declared_value,
 347                    "actual_value": d.actual_value,
 348                }
 349                for d in self.storage_parameter_drifts
 350            ],
 351        }
 352
 353
 354def analyze_model(
 355    conn: DatabaseConnection, cursor: CursorWrapper, model: type[Model]
 356) -> ModelAnalysis:
 357    """Compare a model against the database and classify each difference.
 358
 359    Introspects the actual table state, compares it against model definitions,
 360    and produces a ModelAnalysis where each column/index/constraint carries its
 361    issue (if any) and drift object (if schema differs).
 362    """
 363    table_name = model.model_options.db_table
 364    db = introspect_table(conn, cursor, table_name)
 365
 366    if not db.exists:
 367        return ModelAnalysis(
 368            label=model.model_options.label,
 369            table=table_name,
 370            table_issues=["table missing from database"],
 371        )
 372
 373    return ModelAnalysis(
 374        label=model.model_options.label,
 375        table=table_name,
 376        columns=_compare_columns(model, db, table_name, cursor),
 377        indexes=_compare_indexes(cursor, model, db, table_name),
 378        constraints=_compare_constraints(cursor, model, db, table_name),
 379        storage_parameter_drifts=_compare_storage_parameters(model, db, table_name),
 380    )
 381
 382
 383def _compare_storage_parameters(
 384    model: type[Model], db: TableState, table: str
 385) -> list[StorageParameterDrift]:
 386    """Diff declared `model_options.storage_parameters` against `pg_class.reloptions`.
 387
 388    Declared keys missing from the DB → MISSING. Mismatched values → CHANGED.
 389    Live keys not declared → UNDECLARED (so convergence can RESET them, keeping
 390    the model as the source of truth — matches how indexes/constraints work).
 391    """
 392    declared = model.model_options.storage_parameters
 393    actual = db.storage_parameters
 394    drifts: list[StorageParameterDrift] = []
 395
 396    for key, declared_value in declared.items():
 397        actual_value = actual.get(key)
 398        if actual_value is None:
 399            drifts.append(
 400                StorageParameterDrift(
 401                    kind=DriftKind.MISSING,
 402                    table=table,
 403                    key=key,
 404                    declared_value=declared_value,
 405                )
 406            )
 407        elif actual_value != declared_value:
 408            drifts.append(
 409                StorageParameterDrift(
 410                    kind=DriftKind.CHANGED,
 411                    table=table,
 412                    key=key,
 413                    declared_value=declared_value,
 414                    actual_value=actual_value,
 415                )
 416            )
 417
 418    for key, actual_value in actual.items():
 419        if key not in declared:
 420            drifts.append(
 421                StorageParameterDrift(
 422                    kind=DriftKind.UNDECLARED,
 423                    table=table,
 424                    key=key,
 425                    actual_value=actual_value,
 426                )
 427            )
 428
 429    return drifts
 430
 431
 432# Column comparison
 433
 434
 435def _column_has_nulls(cursor: CursorWrapper, table: str, column: str) -> bool:
 436    """Check whether any NULL values exist in a column."""
 437    cursor.execute(
 438        f"SELECT 1 FROM {quote_name(table)} WHERE {quote_name(column)} IS NULL LIMIT 1"
 439    )
 440    return cursor.fetchone() is not None
 441
 442
 443def _compare_columns(
 444    model: type[Model], db: TableState, table: str, cursor: CursorWrapper
 445) -> list[ColumnStatus]:
 446    statuses: list[ColumnStatus] = []
 447    expected_col_names: set[str] = set()
 448
 449    for f in model._model_meta.local_fields:
 450        if not isinstance(f, ColumnField):
 451            continue
 452        db_type = f.db_type()
 453        if db_type is None:
 454            continue
 455
 456        expected_col_names.add(f.column)
 457        issue: str | None = None
 458        drifts: list[ColumnDrift] = []
 459
 460        if f.column not in db.columns:
 461            issue = "missing from database"
 462        else:
 463            actual = db.columns[f.column]
 464            if db_type != actual.type:
 465                issue = f"expected {db_type}, actual {actual.type}"
 466            elif not f.allow_null and not actual.not_null:
 467                # Model says NOT NULL, DB allows NULL — semantic drift
 468                has_nulls = _column_has_nulls(cursor, table, f.column)
 469                if has_nulls:
 470                    issue = "expected NOT NULL, actual NULL (NULL rows exist)"
 471                else:
 472                    issue = "expected NOT NULL, actual NULL"
 473                drifts.append(
 474                    NullabilityDrift(
 475                        table=table,
 476                        column=f.column,
 477                        model_allows_null=False,
 478                        has_null_rows=has_nulls,
 479                    )
 480                )
 481            elif f.allow_null and actual.not_null:
 482                issue = "expected NULL, actual NOT NULL"
 483                drifts.append(
 484                    NullabilityDrift(
 485                        table=table,
 486                        column=f.column,
 487                        model_allows_null=True,
 488                    )
 489                )
 490
 491            if default_drift := _compare_column_default(cursor, f, actual, table):
 492                drifts.append(default_drift)
 493                if issue is None:
 494                    issue = default_drift.describe()
 495
 496        pk_suffix = ""
 497        if f.primary_key:
 498            pk_suffix = f.db_type_suffix() or ""
 499
 500        assert f.name is not None
 501        statuses.append(
 502            ColumnStatus(
 503                name=f.column,
 504                field_name=f.name,
 505                type=db_type,
 506                nullable=f.allow_null,
 507                primary_key=f.primary_key,
 508                pk_suffix=pk_suffix,
 509                issue=issue,
 510                drifts=drifts,
 511            )
 512        )
 513
 514    for col_name in sorted(db.columns.keys() - expected_col_names):
 515        actual = db.columns[col_name]
 516        statuses.append(
 517            ColumnStatus(
 518                name=col_name,
 519                field_name="",
 520                type=actual.type,
 521                nullable=not actual.not_null,
 522                primary_key=False,
 523                pk_suffix="",
 524                issue="extra column, not in model",
 525            )
 526        )
 527
 528    return statuses
 529
 530
 531_JSONB_LITERAL_RE = re.compile(r"^\s*'((?:[^']|'')*)'::jsonb\s*$", re.IGNORECASE)
 532
 533
 534def _extract_jsonb_literal(sql: str) -> str | None:
 535    m = _JSONB_LITERAL_RE.match(sql)
 536    if m is None:
 537        return None
 538    return m.group(1).replace("''", "'")
 539
 540
 541def _canonicalize_default_expr(
 542    cursor: CursorWrapper, model: type[Model], column: str, default_sql: str
 543) -> str:
 544    """Round-trip a column DEFAULT through Postgres so both sides of the
 545    comparison go through pg_get_expr — the live default comes from
 546    `pg_attrdef.adbin` during introspection, and the expected side is set on
 547    the temp table column then read the same way.
 548
 549    Returns "" if the model SQL is incompatible with the live column shape;
 550    comparison then sees inequality and reports drift.
 551    """
 552    try:
 553        with _canon_table(cursor, model):
 554            cursor.execute(
 555                f"ALTER TABLE {_CANON_TABLE} ALTER COLUMN {quote_name(column)} "
 556                f"SET DEFAULT {default_sql}"
 557            )
 558            cursor.execute(
 559                "SELECT pg_get_expr(ad.adbin, ad.adrelid) FROM pg_attrdef ad "
 560                "JOIN pg_attribute a ON a.attnum = ad.adnum AND a.attrelid = ad.adrelid "
 561                "WHERE a.attrelid = (SELECT oid FROM pg_class WHERE relname = %s "
 562                "AND relnamespace = pg_my_temp_schema()) "
 563                "AND a.attname = %s",
 564                [_CANON_TABLE, column],
 565            )
 566            row = cursor.fetchone()
 567            return row[0] if row else ""
 568    except _CANON_FALLBACK_ERRORS:
 569        return ""
 570
 571
 572def _compare_column_default(
 573    cursor: CursorWrapper, field: Field, actual: ColumnState, table: str
 574) -> ColumnDefaultDrift | None:
 575    expected_sql: str | None = None
 576    db_default_expr = field.get_db_default_expression()
 577    if db_default_expr is not None:
 578        expected_sql = compile_database_default_sql(db_default_expr)
 579    elif field.has_persistent_literal_default():
 580        expected_sql = compile_literal_default_sql(field)
 581
 582    if expected_sql is not None:
 583        if actual.default_sql is None:
 584            return ColumnDefaultDrift(
 585                kind=DriftKind.MISSING,
 586                table=table,
 587                column=field.column,
 588                db_default_sql=None,
 589                model_default_sql=expected_sql,
 590            )
 591
 592        canonical_expected = _canonicalize_default_expr(
 593            cursor, field.model, field.column, expected_sql
 594        )
 595        if canonical_expected == actual.default_sql:
 596            return None
 597        # Semantic compare for jsonb — PG canonicalizes object keys, which
 598        # won't match Python's dict-insertion order even after a round-trip.
 599        m_json = _extract_jsonb_literal(canonical_expected)
 600        d_json = _extract_jsonb_literal(actual.default_sql)
 601        if m_json is not None and d_json is not None:
 602            try:
 603                if json.loads(m_json) == json.loads(d_json):
 604                    return None
 605            except json.JSONDecodeError:
 606                pass
 607
 608        return ColumnDefaultDrift(
 609            kind=DriftKind.CHANGED,
 610            table=table,
 611            column=field.column,
 612            db_default_sql=actual.default_sql,
 613            model_default_sql=expected_sql,
 614        )
 615
 616    if actual.default_sql is None:
 617        return None
 618
 619    return ColumnDefaultDrift(
 620        kind=DriftKind.UNDECLARED,
 621        table=table,
 622        column=field.column,
 623        db_default_sql=actual.default_sql,
 624        model_default_sql=None,
 625    )
 626
 627
 628# Index comparison with rename detection
 629
 630
 631def _compare_indexes(
 632    cursor: CursorWrapper, model: type[Model], db: TableState, table: str
 633) -> list[IndexStatus]:
 634    statuses: list[IndexStatus] = []
 635    missing: list[Index] = []
 636    model_index_names = {idx.name for idx in model.model_options.indexes}
 637    # Unique indexes are handled by _compare_unique_constraints, not here.
 638    # Also exclude indexes that back unique constraints in pg_constraint.
 639    unique_constraint_names = {
 640        k for k, v in db.constraints.items() if v.constraint_type == ConType.UNIQUE
 641    }
 642    non_unique_indexes = {
 643        k: v
 644        for k, v in db.indexes.items()
 645        if not v.is_unique and k not in unique_constraint_names
 646    }
 647
 648    for index in model.model_options.indexes:
 649        if index.name not in non_unique_indexes:
 650            missing.append(index)
 651            continue
 652
 653        db_idx = non_unique_indexes[index.name]
 654
 655        # Name collision: DB has an unmanaged index type with this name
 656        if db_idx.access_method not in MANAGED_INDEX_ACCESS_METHODS:
 657            statuses.append(
 658                IndexStatus(
 659                    name=index.name,
 660                    fields=list(index.fields),
 661                    issue=f"name conflict with {db_idx.access_method} index — rename one to resolve",
 662                )
 663            )
 664            continue
 665
 666        if not db_idx.is_valid:
 667            statuses.append(
 668                IndexStatus(
 669                    name=index.name,
 670                    fields=list(index.fields),
 671                    issue="INVALID — needs drop and recreate",
 672                    drift=IndexDrift(
 673                        kind=DriftKind.INVALID,
 674                        table=table,
 675                        index=index,
 676                        model=model,
 677                    ),
 678                )
 679            )
 680            continue
 681
 682        # Check if definition matches
 683        if db_idx.definition:
 684            issue = _compare_canonical_index(
 685                cursor=cursor,
 686                model=model,
 687                expressions=index.expressions,
 688                fields_orders=list(index.fields_orders),
 689                opclasses=list(index.opclasses),
 690                condition=index.condition,
 691                include=index.include,
 692                actual_def=db_idx.definition,
 693            )
 694            if issue:
 695                statuses.append(
 696                    IndexStatus(
 697                        name=index.name,
 698                        fields=list(index.fields),
 699                        issue=issue,
 700                        drift=IndexDrift(
 701                            kind=DriftKind.CHANGED,
 702                            table=table,
 703                            index=index,
 704                            model=model,
 705                        ),
 706                    )
 707                )
 708                continue
 709
 710        # Index exists and matches
 711        statuses.append(
 712            IndexStatus(
 713                name=index.name,
 714                fields=list(index.fields),
 715            )
 716        )
 717
 718    # Extra indexes (in DB but not in model)
 719    extra_names = sorted(non_unique_indexes.keys() - model_index_names)
 720
 721    # Only managed index types participate in rename detection
 722    managed_extra = [
 723        n
 724        for n in extra_names
 725        if non_unique_indexes[n].access_method in MANAGED_INDEX_ACCESS_METHODS
 726    ]
 727
 728    # Detect renames by canonical (round-tripped) index body. Build the cheap
 729    # already-introspected side first so we can skip the per-missing
 730    # canonicalization loop on the steady-state path.
 731    renamed_missing: set[str] = set()
 732    renamed_extra: set[str] = set()
 733
 734    extra_by_def: dict[str, list[str]] = {}
 735    for name in managed_extra:
 736        defn = non_unique_indexes[name].definition
 737        if defn:
 738            extra_by_def.setdefault(_index_def_tail(defn), []).append(name)
 739
 740    missing_by_def: dict[str, list[Index]] = {}
 741    if extra_by_def:
 742        for index in missing:
 743            expected_tail = _canonicalize_index_def(
 744                cursor,
 745                model,
 746                expressions=index.expressions,
 747                fields_orders=list(index.fields_orders),
 748                opclasses=list(index.opclasses),
 749                condition=index.condition,
 750                include=index.include,
 751            )
 752            if not expected_tail:
 753                # Canonicalization failed; bucketing under "" would conflate
 754                # multiple sentinel-failing indexes and disable rename
 755                # detection for the rest.
 756                continue
 757            missing_by_def.setdefault(expected_tail, []).append(index)
 758
 759    for tail, m_list in missing_by_def.items():
 760        e_list = extra_by_def.get(tail)
 761        if e_list and len(m_list) == 1 and len(e_list) == 1:
 762            index = m_list[0]
 763            old_name = e_list[0]
 764            statuses.append(
 765                IndexStatus(
 766                    name=index.name,
 767                    fields=list(index.fields),
 768                    issue=f"rename from {old_name}",
 769                    drift=IndexDrift(
 770                        kind=DriftKind.RENAMED,
 771                        table=table,
 772                        old_name=old_name,
 773                        new_name=index.name,
 774                    ),
 775                )
 776            )
 777            renamed_missing.add(index.name)
 778            renamed_extra.add(old_name)
 779
 780    # Remaining unmatched missing
 781    for index in missing:
 782        if index.name not in renamed_missing:
 783            statuses.append(
 784                IndexStatus(
 785                    name=index.name,
 786                    fields=list(index.fields),
 787                    issue="missing from database",
 788                    drift=IndexDrift(
 789                        kind=DriftKind.MISSING,
 790                        table=table,
 791                        index=index,
 792                        model=model,
 793                    ),
 794                )
 795            )
 796
 797    # Extra managed indexes are undeclared
 798    for name in managed_extra:
 799        if name not in renamed_extra:
 800            statuses.append(
 801                IndexStatus(
 802                    name=name,
 803                    fields=non_unique_indexes[name].columns,
 804                    issue="not in model",
 805                    drift=IndexDrift(
 806                        kind=DriftKind.UNDECLARED,
 807                        table=table,
 808                        name=name,
 809                    ),
 810                )
 811            )
 812
 813    # Extra unmanaged indexes — informational only, no drift
 814    for name in extra_names:
 815        idx = non_unique_indexes[name]
 816        if idx.access_method not in MANAGED_INDEX_ACCESS_METHODS:
 817            statuses.append(
 818                IndexStatus(
 819                    name=name,
 820                    fields=idx.columns,
 821                    access_method=idx.access_method,
 822                )
 823            )
 824
 825    return statuses
 826
 827
 828# Constraint comparison
 829
 830
 831def _compare_constraints(
 832    cursor: CursorWrapper, model: type[Model], db: TableState, table: str
 833) -> list[ConstraintStatus]:
 834    statuses: list[ConstraintStatus] = []
 835    statuses.extend(_compare_unique_constraints(cursor, model, db, table))
 836    statuses.extend(_compare_check_constraints(cursor, model, db, table))
 837    statuses.extend(_compare_foreign_keys(model, db, table))
 838
 839    # Unmanaged constraint types — informational only, no drift.
 840    # Primary keys are also unmanaged but not shown.
 841    for name, cs in db.constraints.items():
 842        if (
 843            cs.constraint_type not in MANAGED_CONSTRAINT_TYPES
 844            and cs.constraint_type != ConType.PRIMARY
 845        ):
 846            statuses.append(
 847                ConstraintStatus(
 848                    name=name,
 849                    constraint_type=cs.constraint_type,
 850                    fields=cs.columns,
 851                )
 852            )
 853
 854    return statuses
 855
 856
 857def _compare_unique_constraints(
 858    cursor: CursorWrapper, model: type[Model], db: TableState, table: str
 859) -> list[ConstraintStatus]:
 860    statuses: list[ConstraintStatus] = []
 861    # Unique constraints from pg_constraint (contype='u')
 862    actual_constraints = {
 863        k: v for k, v in db.constraints.items() if v.constraint_type == ConType.UNIQUE
 864    }
 865    # Unique indexes from pg_index that don't have a backing pg_constraint
 866    # (e.g. partial/expression unique indexes created with CREATE UNIQUE INDEX)
 867    actual_indexes = {
 868        k: ConstraintState(
 869            constraint_type=ConType.UNIQUE,
 870            columns=v.columns,
 871            validated=True,
 872            definition=v.definition,
 873        )
 874        for k, v in db.indexes.items()
 875        if v.is_unique and k not in actual_constraints
 876    }
 877    actual = {**actual_constraints, **actual_indexes}
 878    model_constraints = [
 879        c for c in model.model_options.constraints if isinstance(c, UniqueConstraint)
 880    ]
 881    expected_names = {c.name for c in model_constraints}
 882    extra_names = sorted(actual.keys() - expected_names)
 883
 884    missing: list[UniqueConstraint] = []
 885    for constraint in model_constraints:
 886        if constraint.name not in actual:
 887            missing.append(constraint)
 888            continue
 889
 890        issue: str | None = None
 891        drift: ConstraintDrift | None = None
 892
 893        if not actual[constraint.name].validated:
 894            issue = "NOT VALID — needs validation"
 895            drift = ConstraintDrift(
 896                kind=DriftKind.UNVALIDATED,
 897                table=table,
 898                name=constraint.name,
 899            )
 900        elif constraint.index_only:
 901            issue, drift = _compare_index_only_unique(
 902                cursor, model, constraint, actual[constraint.name], table
 903            )
 904        elif actual_def := actual[constraint.name].definition:
 905            expected_def = _get_expected_unique_definition(cursor, model, constraint)
 906            # Both sides are deparsed by pg_get_constraintdef → string equality.
 907            if actual_def != expected_def:
 908                if expected_def:
 909                    issue = f"definition differs: DB has {actual_def!r}, model expects {expected_def!r}"
 910                else:
 911                    # Round-trip canonicalization couldn't complete; canonical
 912                    # model text is unavailable for the diagnostic.
 913                    issue = f"definition differs: DB has {actual_def!r}"
 914                drift = ConstraintDrift(
 915                    kind=DriftKind.CHANGED,
 916                    table=table,
 917                    constraint=constraint,
 918                    model=model,
 919                )
 920
 921        statuses.append(
 922            ConstraintStatus(
 923                name=constraint.name,
 924                constraint_type=ConType.UNIQUE,
 925                fields=list(constraint.fields),
 926                issue=issue,
 927                drift=drift,
 928            )
 929        )
 930
 931    # Detect renames by columns
 932    rename_statuses, renamed_missing, renamed_extra = _detect_unique_renames(
 933        cursor, missing, extra_names, actual, model, table
 934    )
 935    statuses.extend(rename_statuses)
 936
 937    for constraint in missing:
 938        if constraint.name not in renamed_missing:
 939            statuses.append(
 940                ConstraintStatus(
 941                    name=constraint.name,
 942                    constraint_type=ConType.UNIQUE,
 943                    fields=list(constraint.fields),
 944                    issue="missing from database",
 945                    drift=ConstraintDrift(
 946                        kind=DriftKind.MISSING,
 947                        table=table,
 948                        constraint=constraint,
 949                        model=model,
 950                    ),
 951                )
 952            )
 953
 954    for name in extra_names:
 955        if name not in renamed_extra:
 956            # Index-only entries (from pg_index, not pg_constraint) need
 957            # IndexDrift so the planner uses DROP INDEX, not DROP CONSTRAINT.
 958            undeclared_drift: Drift
 959            if name in actual_indexes:
 960                undeclared_drift = IndexDrift(
 961                    kind=DriftKind.UNDECLARED, table=table, name=name
 962                )
 963            else:
 964                undeclared_drift = ConstraintDrift(
 965                    kind=DriftKind.UNDECLARED, table=table, name=name
 966                )
 967            statuses.append(
 968                ConstraintStatus(
 969                    name=name,
 970                    constraint_type=ConType.UNIQUE,
 971                    fields=actual[name].columns,
 972                    issue="not in model",
 973                    drift=undeclared_drift,
 974                )
 975            )
 976
 977    return statuses
 978
 979
 980def _compare_check_constraints(
 981    cursor: CursorWrapper, model: type[Model], db: TableState, table: str
 982) -> list[ConstraintStatus]:
 983    statuses: list[ConstraintStatus] = []
 984    actual = {
 985        k: v for k, v in db.constraints.items() if v.constraint_type == ConType.CHECK
 986    }
 987    model_constraints = [
 988        c for c in model.model_options.constraints if isinstance(c, CheckConstraint)
 989    ]
 990    expected_names = {c.name for c in model_constraints}
 991    extra_names = sorted(actual.keys() - expected_names)
 992
 993    missing: list[CheckConstraint] = []
 994    for constraint in model_constraints:
 995        if constraint.name not in actual:
 996            missing.append(constraint)
 997            continue
 998
 999        issue: str | None = None
1000        drift: ConstraintDrift | None = None
1001
1002        if not actual[constraint.name].validated:
1003            issue = "NOT VALID — needs validation"
1004            drift = ConstraintDrift(
1005                kind=DriftKind.UNVALIDATED,
1006                table=table,
1007                name=constraint.name,
1008            )
1009        elif actual_def := actual[constraint.name].definition:
1010            expected_def = _get_expected_check_definition(cursor, model, constraint)
1011            # Both sides are deparsed by pg_get_constraintdef → string equality.
1012            if actual_def != expected_def:
1013                if expected_def:
1014                    issue = f"definition differs: DB has {actual_def!r}, model expects {expected_def!r}"
1015                else:
1016                    # Round-trip canonicalization couldn't complete; canonical
1017                    # model text is unavailable for the diagnostic.
1018                    issue = f"definition differs: DB has {actual_def!r}"
1019                drift = ConstraintDrift(
1020                    kind=DriftKind.CHANGED,
1021                    table=table,
1022                    constraint=constraint,
1023                    model=model,
1024                )
1025
1026        statuses.append(
1027            ConstraintStatus(
1028                name=constraint.name,
1029                constraint_type=ConType.CHECK,
1030                fields=[],
1031                issue=issue,
1032                drift=drift,
1033            )
1034        )
1035
1036    # Detect renames by definition
1037    rename_statuses, renamed_missing, renamed_extra = _detect_check_renames(
1038        cursor, missing, extra_names, actual, model, table
1039    )
1040    statuses.extend(rename_statuses)
1041
1042    for constraint in missing:
1043        if constraint.name not in renamed_missing:
1044            statuses.append(
1045                ConstraintStatus(
1046                    name=constraint.name,
1047                    constraint_type=ConType.CHECK,
1048                    fields=[],
1049                    issue="missing from database",
1050                    drift=ConstraintDrift(
1051                        kind=DriftKind.MISSING,
1052                        table=table,
1053                        constraint=constraint,
1054                        model=model,
1055                    ),
1056                )
1057            )
1058
1059    # Build set of framework-owned temp NOT NULL check names so leftover
1060    # artifacts from a partially-completed SetNotNullFix are silently
1061    # ignored rather than surfaced as undeclared user constraints.
1062    internal_checks = {
1063        generate_notnull_check_name(table, f.column)
1064        for f in model._model_meta.local_fields
1065        if f.db_type() is not None
1066    }
1067
1068    for name in extra_names:
1069        if name not in renamed_extra and name not in internal_checks:
1070            statuses.append(
1071                ConstraintStatus(
1072                    name=name,
1073                    constraint_type=ConType.CHECK,
1074                    fields=actual[name].columns,
1075                    issue="not in model",
1076                    drift=ConstraintDrift(
1077                        kind=DriftKind.UNDECLARED,
1078                        table=table,
1079                        name=name,
1080                    ),
1081                )
1082            )
1083
1084    return statuses
1085
1086
1087def _compare_foreign_keys(
1088    model: type[Model], db: TableState, table: str
1089) -> list[ConstraintStatus]:
1090    statuses: list[ConstraintStatus] = []
1091    actual = {
1092        k: v
1093        for k, v in db.constraints.items()
1094        if v.constraint_type == ConType.FOREIGN_KEY
1095    }
1096
1097    # Build expected FKs from model fields.
1098    # Key: shape (column, target_table, target_column)
1099    # Value: (field_name, constraint_name, expected_on_delete_clause, expected_confdeltype)
1100    expected_fks: dict[tuple[str, str, str], tuple[str, str, str, str]] = {}
1101    for f in model._model_meta.local_fields:
1102        if isinstance(f, ForeignKeyField) and f.db_constraint:
1103            assert f.name is not None
1104            to_table = f.target_field.model.model_options.db_table
1105            to_column = f.target_field.column
1106            constraint_name = generate_fk_constraint_name(
1107                table, f.column, to_table, to_column
1108            )
1109            on_delete_clause, confdeltype = sql_on_delete(f.remote_field.on_delete)
1110            expected_fks[(f.column, to_table, to_column)] = (
1111                f.name,
1112                constraint_name,
1113                on_delete_clause,
1114                confdeltype,
1115            )
1116
1117    # Build actual FKs from DB: shape → (constraint_name, ConstraintState)
1118    actual_fk_by_shape: dict[tuple[str, str, str], tuple[str, ConstraintState]] = {}
1119    for name, cs in actual.items():
1120        if cs.target_table and cs.target_column and cs.columns:
1121            actual_fk_by_shape[(cs.columns[0], cs.target_table, cs.target_column)] = (
1122                name,
1123                cs,
1124            )
1125
1126    matched_fk_names: set[str] = set()
1127    for key, (
1128        field_name,
1129        constraint_name,
1130        on_delete_clause,
1131        expected_action,
1132    ) in expected_fks.items():
1133        if match := actual_fk_by_shape.get(key):
1134            actual_name, cs = match
1135            matched_fk_names.add(actual_name)
1136
1137            issue: str | None = None
1138            drift: ForeignKeyDrift | None = None
1139
1140            # on_delete action mismatch — drop + re-add with new clause
1141            if (
1142                cs.on_delete_action is not None
1143                and cs.on_delete_action != expected_action
1144            ):
1145                issue = (
1146                    f"on_delete action differs "
1147                    f"({cs.on_delete_action!r}{expected_action!r})"
1148                )
1149                col, to_table, to_column = key
1150                drift = ForeignKeyDrift(
1151                    kind=DriftKind.CHANGED,
1152                    table=table,
1153                    name=actual_name,
1154                    column=col,
1155                    target_table=to_table,
1156                    target_column=to_column,
1157                    on_delete_clause=on_delete_clause,
1158                    actual_action=cs.on_delete_action,
1159                    expected_action=expected_action,
1160                )
1161            elif not cs.validated:
1162                issue = "NOT VALID — needs validation"
1163                drift = ForeignKeyDrift(
1164                    kind=DriftKind.UNVALIDATED,
1165                    table=table,
1166                    name=actual_name,
1167                )
1168
1169            statuses.append(
1170                ConstraintStatus(
1171                    name=actual_name,
1172                    constraint_type=ConType.FOREIGN_KEY,
1173                    fields=[key[0]],
1174                    issue=issue,
1175                    drift=drift,
1176                )
1177            )
1178        else:
1179            col, to_table, to_column = key
1180            statuses.append(
1181                ConstraintStatus(
1182                    name=f"{field_name}{to_table}.{to_column}",
1183                    constraint_type=ConType.FOREIGN_KEY,
1184                    fields=[col],
1185                    issue="missing from database",
1186                    drift=ForeignKeyDrift(
1187                        kind=DriftKind.MISSING,
1188                        table=table,
1189                        name=constraint_name,
1190                        column=col,
1191                        target_table=to_table,
1192                        target_column=to_column,
1193                        on_delete_clause=on_delete_clause,
1194                    ),
1195                )
1196            )
1197
1198    for name in sorted(actual.keys() - matched_fk_names):
1199        cs = actual[name]
1200        statuses.append(
1201            ConstraintStatus(
1202                name=name,
1203                constraint_type=ConType.FOREIGN_KEY,
1204                fields=cs.columns,
1205                issue=f"not in model (→ {cs.target_table}.{cs.target_column})",
1206                drift=ForeignKeyDrift(
1207                    kind=DriftKind.UNDECLARED,
1208                    table=table,
1209                    name=name,
1210                ),
1211            )
1212        )
1213
1214    return statuses
1215
1216
1217def generate_notnull_check_name(table: str, column: str) -> str:
1218    """Generate a hashed name for the temporary NOT NULL check constraint.
1219
1220    Used by SetNotNullFix for the CHECK NOT VALID → VALIDATE → SET NOT NULL
1221    pattern, and by analysis to recognize (and ignore) leftover temp checks.
1222    """
1223    from ..utils import generate_identifier_name
1224
1225    return generate_identifier_name(table, [column], "_notnull")
1226
1227
1228def generate_fk_constraint_name(
1229    table: str, column: str, target_table: str, target_column: str
1230) -> str:
1231    """Generate a deterministic FK constraint name.
1232
1233    Uses the same naming algorithm as the schema editor so that
1234    convergence-created FKs match migration-created ones.
1235    """
1236    from ..utils import generate_identifier_name, split_identifier
1237
1238    _, target_table_name = split_identifier(target_table)
1239    suffix = f"_fk_{target_table_name}_{target_column}"
1240    return generate_identifier_name(table, [column], suffix)
1241
1242
1243def _detect_unique_renames(
1244    cursor: CursorWrapper,
1245    missing: list[UniqueConstraint],
1246    extra_names: list[str],
1247    actual_dict: dict[str, ConstraintState],
1248    model: type[Model],
1249    table: str,
1250) -> tuple[list[ConstraintStatus], set[str], set[str]]:
1251    """Match missing and extra unique constraints by structure.
1252
1253    Constraint-backed (not index_only): matched by resolved column tuple.
1254    Index-only (condition/expression/opclass): matched by canonical
1255    (round-tripped) index definition, which captures the full semantics
1256    including WHERE clauses, opclasses, and expressions.
1257    """
1258    statuses: list[ConstraintStatus] = []
1259    renamed_missing: set[str] = set()
1260    renamed_extra: set[str] = set()
1261
1262    # Phase 1: Field-based — match by resolved column tuple.
1263    # Covers both constraint-backed and index-only field-based constraints.
1264    missing_by_cols: dict[tuple[str, ...], list[UniqueConstraint]] = {}
1265    for constraint in missing:
1266        if not constraint.fields:
1267            continue
1268        cols = tuple(
1269            model._model_meta.get_forward_field(field_name).column
1270            for field_name in constraint.fields
1271        )
1272        missing_by_cols.setdefault(cols, []).append(constraint)
1273
1274    extra_by_cols: dict[tuple[str, ...], list[str]] = {}
1275    for name in extra_names:
1276        cols = tuple(actual_dict[name].columns)
1277        if cols:
1278            extra_by_cols.setdefault(cols, []).append(name)
1279
1280    for cols, m_list in missing_by_cols.items():
1281        e_list = extra_by_cols.get(cols)
1282        if e_list and len(m_list) == 1 and len(e_list) == 1:
1283            constraint = m_list[0]
1284            old_name = e_list[0]
1285            # For index-only uniques, same columns isn't enough — the
1286            # condition or opclass may have changed.  Verify the full
1287            # definition matches before accepting a rename, otherwise
1288            # let both sides fall through as separate missing + undeclared.
1289            if constraint.index_only:
1290                old_def = actual_dict[old_name].definition
1291                if not old_def:
1292                    continue
1293                expected_tail = _canonicalize_index_def(
1294                    cursor,
1295                    model,
1296                    expressions=constraint.expressions,
1297                    fields_orders=[(f, "") for f in constraint.fields],
1298                    opclasses=list(constraint.opclasses)
1299                    if constraint.opclasses
1300                    else [],
1301                    condition=constraint.condition,
1302                    include=constraint.include,
1303                    unique=True,
1304                )
1305                if _index_def_tail(old_def) != expected_tail:
1306                    continue
1307            DriftType = IndexDrift if constraint.index_only else ConstraintDrift
1308            statuses.append(
1309                ConstraintStatus(
1310                    name=constraint.name,
1311                    constraint_type=ConType.UNIQUE,
1312                    fields=list(constraint.fields),
1313                    issue=f"rename from {old_name}",
1314                    drift=DriftType(
1315                        kind=DriftKind.RENAMED,
1316                        table=table,
1317                        old_name=old_name,
1318                        new_name=constraint.name,
1319                    ),
1320                )
1321            )
1322            renamed_missing.add(constraint.name)
1323            renamed_extra.add(old_name)
1324
1325    # Phase 2: Expression-based — match by canonical (round-tripped) index
1326    # body. Build the cheap already-introspected extras side first so we can
1327    # skip the per-missing round-trip on the steady-state path.
1328    extra_by_def: dict[str, list[str]] = {}
1329    for name in extra_names:
1330        if name in renamed_extra:
1331            continue
1332        defn = actual_dict[name].definition
1333        if defn:
1334            extra_by_def.setdefault(_index_def_tail(defn), []).append(name)
1335
1336    missing_by_def: dict[str, list[UniqueConstraint]] = {}
1337    if extra_by_def:
1338        for constraint in missing:
1339            if constraint.fields or constraint.name in renamed_missing:
1340                continue
1341            # constraint.fields is empty here (filtered above), so this is
1342            # the expression-based path — fields_orders is unused.
1343            expected_tail = _canonicalize_index_def(
1344                cursor,
1345                model,
1346                expressions=constraint.expressions,
1347                opclasses=list(constraint.opclasses),
1348                condition=constraint.condition,
1349                include=constraint.include,
1350                unique=True,
1351            )
1352            if not expected_tail:
1353                # See _compare_indexes for why we skip the sentinel.
1354                continue
1355            missing_by_def.setdefault(expected_tail, []).append(constraint)
1356
1357    for tail, m_list in missing_by_def.items():
1358        e_list = extra_by_def.get(tail)
1359        if e_list and len(m_list) == 1 and len(e_list) == 1:
1360            constraint = m_list[0]
1361            old_name = e_list[0]
1362            # Index-only uniques live as indexes, not constraints, so
1363            # emit IndexDrift so the planner uses ALTER INDEX RENAME.
1364            statuses.append(
1365                ConstraintStatus(
1366                    name=constraint.name,
1367                    constraint_type=ConType.UNIQUE,
1368                    fields=list(constraint.fields),
1369                    issue=f"rename from {old_name}",
1370                    drift=IndexDrift(
1371                        kind=DriftKind.RENAMED,
1372                        table=table,
1373                        old_name=old_name,
1374                        new_name=constraint.name,
1375                    ),
1376                )
1377            )
1378            renamed_missing.add(constraint.name)
1379            renamed_extra.add(old_name)
1380
1381    return statuses, renamed_missing, renamed_extra
1382
1383
1384def _detect_check_renames(
1385    cursor: CursorWrapper,
1386    missing: list[CheckConstraint],
1387    extra_names: list[str],
1388    actual_dict: dict[str, ConstraintState],
1389    model: type[Model],
1390    table: str,
1391) -> tuple[list[ConstraintStatus], set[str], set[str]]:
1392    """Match missing and extra check constraints by canonical definition."""
1393    statuses: list[ConstraintStatus] = []
1394    renamed_missing: set[str] = set()
1395    renamed_extra: set[str] = set()
1396
1397    # Skip the round-trip canonicalization loop if there are no extras to
1398    # potentially match — the steady-state path with no drift.
1399    extra_by_def: dict[str, list[str]] = {}
1400    for name in extra_names:
1401        if definition := actual_dict[name].definition:
1402            extra_by_def.setdefault(definition, []).append(name)
1403
1404    missing_by_def: dict[str, list[CheckConstraint]] = {}
1405    if extra_by_def:
1406        for constraint in missing:
1407            expected_def = _get_expected_check_definition(cursor, model, constraint)
1408            if not expected_def:
1409                # Canonicalization failed; bucketing under "" would conflate
1410                # multiple sentinel-failing constraints and disable rename
1411                # detection for the rest.
1412                continue
1413            missing_by_def.setdefault(expected_def, []).append(constraint)
1414
1415    for definition, m_list in missing_by_def.items():
1416        e_list = extra_by_def.get(definition)
1417        if e_list and len(m_list) == 1 and len(e_list) == 1:
1418            constraint = m_list[0]
1419            old_name = e_list[0]
1420            statuses.append(
1421                ConstraintStatus(
1422                    name=constraint.name,
1423                    constraint_type=ConType.CHECK,
1424                    fields=[],
1425                    issue=f"rename from {old_name}",
1426                    drift=ConstraintDrift(
1427                        kind=DriftKind.RENAMED,
1428                        table=table,
1429                        old_name=old_name,
1430                        new_name=constraint.name,
1431                    ),
1432                )
1433            )
1434            renamed_missing.add(constraint.name)
1435            renamed_extra.add(old_name)
1436
1437    return statuses, renamed_missing, renamed_extra
1438
1439
1440def _compare_index_only_unique(
1441    cursor: CursorWrapper,
1442    model: type[Model],
1443    constraint: UniqueConstraint,
1444    actual_state: ConstraintState,
1445    table: str,
1446) -> tuple[str | None, ConstraintDrift | None]:
1447    """Compare an index-only unique constraint against the DB.
1448
1449    Index-only variants (condition, expressions, opclasses) live as unique
1450    indexes in PostgreSQL, not pg_constraint rows.  Their ConstraintState
1451    comes from the pg_index query path with a pg_get_indexdef definition.
1452    """
1453    actual_def = actual_state.definition
1454    if not actual_def:
1455        return None, None
1456
1457    issue = _compare_canonical_index(
1458        cursor=cursor,
1459        model=model,
1460        expressions=constraint.expressions,
1461        fields_orders=[(f, "") for f in constraint.fields],
1462        opclasses=list(constraint.opclasses),
1463        condition=constraint.condition,
1464        include=constraint.include,
1465        actual_def=actual_def,
1466        unique=True,
1467    )
1468    if issue:
1469        changed = ConstraintDrift(
1470            kind=DriftKind.CHANGED, table=table, constraint=constraint, model=model
1471        )
1472        return issue, changed
1473
1474    return None, None
1475
1476
1477def _compare_canonical_index(
1478    *,
1479    cursor: CursorWrapper,
1480    model: type[Model],
1481    expressions: tuple[Expression | ReplaceableExpression, ...],
1482    fields_orders: list[tuple[str, str]],
1483    opclasses: list[str],
1484    condition: Q | None,
1485    include: tuple[str, ...] | None,
1486    actual_def: str,
1487    unique: bool = False,
1488) -> str | None:
1489    """Compare a model index/constraint against pg_get_indexdef text.
1490
1491    Round-trips the model side through Postgres so both sides come from
1492    pg_get_indexdef, then string-compares the canonical `USING ...` bodies.
1493
1494    Returns an issue string if definitions differ, None if they match.
1495    """
1496    expected_tail = _canonicalize_index_def(
1497        cursor,
1498        model,
1499        expressions=expressions,
1500        fields_orders=fields_orders,
1501        opclasses=opclasses,
1502        condition=condition,
1503        include=include,
1504        unique=unique,
1505    )
1506    actual_tail = _index_def_tail(actual_def)
1507    if not expected_tail:
1508        # Round-trip canonicalization couldn't complete (model SQL
1509        # incompatible with live shape). Report drift with the actual text;
1510        # the canonical model text is unavailable for the diagnostic.
1511        return f"definition differs: DB has {actual_tail!r}"
1512
1513    if actual_tail != expected_tail:
1514        return (
1515            f"definition differs: DB has {actual_tail!r}, "
1516            f"model expects {expected_tail!r}"
1517        )
1518    return None
1519
1520
1521# Round-trip canonicalization: feed model-side SQL to Postgres on a
1522# session-private temp table, read back via pg_get_*.
1523_CANON_TABLE = "_plain_canon"
1524_CANON_CONSTRAINT = "_c"
1525_CANON_INDEX = "_canon_ix"
1526
1527# Errors raised by Postgres when the model SQL is incompatible with the live
1528# table shape (unmigrated column types, references to dropped columns, etc.).
1529# Helpers catch these and return "" so drift is still reported via inequality.
1530# DataError covers literal-cast mismatches (e.g. text default on int column);
1531# NotSupportedError covers the rare PG-side "this combination isn't allowed"
1532# rejection. Both are narrow enough that catching the parent class is fine.
1533# ProgrammingError is intentionally narrowed to specific 42xxx subclasses —
1534# privilege failures (InsufficientPrivilege, also a ProgrammingError) and
1535# plain-side syntax bugs must propagate so users get a clear diagnostic
1536# instead of silent drift noise.
1537_CANON_FALLBACK_ERRORS: tuple[type[Exception], ...] = (
1538    psycopg.errors.UndefinedColumn,
1539    psycopg.errors.UndefinedFunction,
1540    psycopg.errors.UndefinedObject,
1541    psycopg.errors.UndefinedTable,
1542    psycopg.errors.InvalidColumnReference,
1543    psycopg.errors.InvalidObjectDefinition,
1544    psycopg.errors.InvalidTableDefinition,
1545    psycopg.errors.DatatypeMismatch,
1546    psycopg.errors.WrongObjectType,
1547    psycopg.errors.AmbiguousColumn,
1548    psycopg.errors.DataError,
1549    psycopg.errors.NotSupportedError,
1550)
1551
1552
1553class ReadOnlyConnectionError(RuntimeError):
1554    """Raised when convergence analysis runs on a read-only connection.
1555
1556    Analysis canonicalizes the model side of each comparison by round-tripping
1557    SQL through a session-private temp table. That requires DDL, which is
1558    rejected on standby connections and inside `read_only()` blocks.
1559    """
1560
1561
1562@contextmanager
1563def _canon_table(cursor: CursorWrapper, model: type[Model]) -> Iterator[None]:
1564    """Set up a session-private temp table mirroring the model's real table.
1565
1566    `cursor.connection.transaction()` issues a SAVEPOINT when nested (or BEGIN
1567    when run in autocommit). Either way, a model SQL statement incompatible
1568    with the live column shape (e.g. a CHECK referencing a column whose live
1569    type differs from what the model now declares) rolls back to this scope
1570    rather than poisoning the surrounding analyze transaction. Helpers catch
1571    psycopg errors and fall back to a sentinel — drift still gets reported,
1572    just without the canonical model text.
1573
1574    The trailing DROP is schema-qualified to `pg_temp` so a stray real table
1575    sharing the name (in the user's own schema) can't be hit by mistake. A
1576    single connection is always single-threaded, so reusing the name across
1577    helpers is safe.
1578    """
1579    table = quote_name(model.model_options.db_table)
1580    try:
1581        with cursor.connection.transaction():
1582            cursor.execute(f"CREATE TEMP TABLE {_CANON_TABLE} (LIKE {table})")
1583            yield
1584            cursor.execute(f"DROP TABLE pg_temp.{_CANON_TABLE}")
1585    except psycopg.errors.ReadOnlySqlTransaction as exc:
1586        raise ReadOnlyConnectionError(
1587            "Convergence analysis requires write access — it canonicalizes "
1588            "model SQL by creating a session-private temp table. The current "
1589            "connection rejected DDL (read-only transaction or standby). Run "
1590            "analysis against a primary/writable connection."
1591        ) from exc
1592
1593
1594def _canonicalize_constraint_def(
1595    cursor: CursorWrapper, model: type[Model], constraint_clause: str
1596) -> str:
1597    """Round-trip a constraint clause through Postgres so both sides of the
1598    comparison are deparsed by pg_get_constraintdef — string equality then
1599    covers `IN` vs `= ANY (ARRAY[...])`, redundant parens, type cast drift,
1600    INCLUDE column ordering, etc.
1601
1602    Returns "" if the model SQL is incompatible with the live table shape
1603    (e.g. unmigrated column-type drift). Drift still gets reported via the
1604    inequality with the actual live definition; only the canonical model
1605    text is omitted from the diagnostic.
1606    """
1607    try:
1608        with _canon_table(cursor, model):
1609            # Add as validated: the temp table is empty so the implicit scan is
1610            # instant. NOT VALID would leave a trailing " NOT VALID" suffix in
1611            # pg_get_constraintdef that the live constraint won't have.
1612            cursor.execute(
1613                f"ALTER TABLE {_CANON_TABLE} "
1614                f"ADD CONSTRAINT {_CANON_CONSTRAINT} {constraint_clause}"
1615            )
1616            cursor.execute(
1617                "SELECT pg_get_constraintdef(c.oid) FROM pg_constraint c "
1618                "WHERE c.conname = %s "
1619                "AND c.conrelid = (SELECT oid FROM pg_class WHERE relname = %s "
1620                "AND relnamespace = pg_my_temp_schema())",
1621                [_CANON_CONSTRAINT, _CANON_TABLE],
1622            )
1623            row = cursor.fetchone()
1624            return row[0] if row else ""
1625    except _CANON_FALLBACK_ERRORS:
1626        return ""
1627
1628
1629def _get_expected_check_definition(
1630    cursor: CursorWrapper, model: type[Model], constraint: CheckConstraint
1631) -> str:
1632    check_sql = compile_expression_sql(model, constraint.check)
1633    return _canonicalize_constraint_def(cursor, model, f"CHECK ({check_sql})")
1634
1635
1636def _canonicalize_index_def(
1637    cursor: CursorWrapper,
1638    model: type[Model],
1639    *,
1640    expressions: tuple[Expression | ReplaceableExpression, ...] = (),
1641    fields_orders: list[tuple[str, str]] | None = None,
1642    opclasses: list[str] | None = None,
1643    condition: Q | None = None,
1644    include: tuple[str, ...] | None = None,
1645    unique: bool = False,
1646) -> str:
1647    """Round-trip an index through Postgres and return its canonical body.
1648
1649    Returns the `USING ... [INCLUDE (...)] [WHERE (...)]` tail of pg_get_indexdef,
1650    safe to compare directly against `_index_def_tail(actual_def)` from the
1651    DB side. The `CREATE [UNIQUE] INDEX <name> ON <table>` prefix is stripped
1652    here so callers don't have to.
1653
1654    Returns "" if the model SQL is incompatible with the live table shape;
1655    comparison sites then see inequality and report drift without the
1656    canonical model text.
1657    """
1658    if expressions:
1659        columns_sql = compile_index_expressions_sql(model, expressions)
1660    else:
1661        col_parts: list[str] = []
1662        for i, (field_name, suffix) in enumerate(fields_orders or []):
1663            field = model._model_meta.get_forward_field(field_name)
1664            col = quote_name(field.column)
1665            if opclasses:
1666                col = f"{col} {opclasses[i]}"
1667            if suffix:
1668                col = f"{col} {suffix}"
1669            col_parts.append(col)
1670        columns_sql = ", ".join(col_parts)
1671
1672    where_sql = ""
1673    if condition is not None:
1674        where_sql = f" WHERE ({compile_expression_sql(model, condition)})"
1675    include_sql = build_include_sql(model, include or ())
1676    create_kw = "CREATE UNIQUE INDEX" if unique else "CREATE INDEX"
1677
1678    try:
1679        with _canon_table(cursor, model):
1680            cursor.execute(
1681                f"{create_kw} {_CANON_INDEX} ON {_CANON_TABLE} "
1682                f"({columns_sql}){include_sql}{where_sql}"
1683            )
1684            cursor.execute(
1685                "SELECT pg_get_indexdef(c.oid) FROM pg_class c "
1686                "WHERE c.relname = %s AND c.relnamespace = pg_my_temp_schema()",
1687                [_CANON_INDEX],
1688            )
1689            row = cursor.fetchone()
1690            return _index_def_tail(row[0]) if row else ""
1691    except _CANON_FALLBACK_ERRORS:
1692        return ""
1693
1694
1695def _index_def_tail(definition: str) -> str:
1696    """Strip the `CREATE [UNIQUE] INDEX <name> ON <schema>.<table>` prefix
1697    from a pg_get_indexdef output, leaving the canonical body that's safe to
1698    compare across different index names/tables."""
1699    using_pos = definition.find("USING ")
1700    return definition[using_pos:] if using_pos >= 0 else definition
1701
1702
1703def _get_expected_unique_definition(
1704    cursor: CursorWrapper, model: type[Model], constraint: UniqueConstraint
1705) -> str:
1706    """Canonical UNIQUE definition for the model's constraint, as Postgres
1707    prints it.
1708
1709    PostgreSQL only stores field-based unique constraints (with optional
1710    INCLUDE and DEFERRABLE) in pg_constraint. Expression-based, conditional,
1711    and opclass constraints remain as indexes only — those are compared via
1712    the index-definition path.
1713    """
1714    columns_sql = ", ".join(
1715        quote_name(model._model_meta.get_forward_field(f).column)
1716        for f in constraint.fields
1717    )
1718    include_sql = build_include_sql(model, constraint.include)
1719    defer_sql = deferrable_sql(constraint.deferrable)
1720    clause = f"UNIQUE ({columns_sql}){include_sql}{defer_sql}"
1721    return _canonicalize_constraint_def(cursor, model, clause)