1from __future__ import annotations
2
3import json
4import re
5from collections.abc import Iterator
6from contextlib import contextmanager
7from dataclasses import dataclass, field
8from enum import StrEnum
9from functools import cached_property
10from typing import TYPE_CHECKING, Any
11
12import psycopg
13
14from ..constraints import CheckConstraint, UniqueConstraint
15from ..ddl import (
16 build_include_sql,
17 compile_database_default_sql,
18 compile_expression_sql,
19 compile_index_expressions_sql,
20 compile_literal_default_sql,
21 deferrable_sql,
22)
23from ..deletion import sql_on_delete
24from ..dialect import quote_name
25from ..fields.base import ColumnField
26from ..fields.related import ForeignKeyField
27from ..indexes import Index
28from ..introspection import (
29 MANAGED_CONSTRAINT_TYPES,
30 MANAGED_INDEX_ACCESS_METHODS,
31 ColumnState,
32 ConstraintState,
33 ConType,
34 TableState,
35 introspect_table,
36)
37
38if TYPE_CHECKING:
39 from ..base import Model
40 from ..connection import DatabaseConnection
41 from ..expressions import Expression, ReplaceableExpression
42 from ..fields import Field
43 from ..query_utils import Q
44 from ..utils import CursorWrapper
45
46
47# Drift types — semantic descriptions of schema differences
48
49
50class DriftKind(StrEnum):
51 MISSING = "missing"
52 INVALID = "invalid"
53 CHANGED = "changed"
54 RENAMED = "renamed"
55 UNDECLARED = "undeclared"
56 UNVALIDATED = "unvalidated"
57
58
59@dataclass
60class IndexDrift:
61 """A schema difference for an index."""
62
63 kind: DriftKind
64 table: str
65 index: Index | None = None
66 model: type[Model] | None = None
67 old_name: str | None = None
68 new_name: str | None = None
69 name: str | None = None
70
71 def describe(self) -> str:
72 match self.kind:
73 case DriftKind.MISSING:
74 assert self.index is not None
75 return f"{self.table}: index {self.index.name} missing"
76 case DriftKind.INVALID:
77 assert self.index is not None
78 return f"{self.table}: index {self.index.name} INVALID"
79 case DriftKind.CHANGED:
80 assert self.index is not None
81 return f"{self.table}: index {self.index.name} definition changed"
82 case DriftKind.RENAMED:
83 return f"{self.table}: index {self.old_name} → {self.new_name}"
84 case _:
85 return f"{self.table}: index {self.name} not declared"
86
87
88@dataclass
89class ConstraintDrift:
90 """A schema difference for a constraint."""
91
92 kind: DriftKind
93 table: str
94 constraint: CheckConstraint | UniqueConstraint | None = None
95 model: type[Model] | None = None
96 old_name: str | None = None
97 new_name: str | None = None
98 name: str | None = None
99
100 def describe(self) -> str:
101 match self.kind:
102 case DriftKind.MISSING:
103 assert self.constraint is not None
104 return f"{self.table}: constraint {self.constraint.name} missing"
105 case DriftKind.UNVALIDATED:
106 return f"{self.table}: constraint {self.name} NOT VALID"
107 case DriftKind.CHANGED:
108 assert self.constraint is not None
109 return f"{self.table}: constraint {self.constraint.name} definition changed"
110 case DriftKind.RENAMED:
111 return f"{self.table}: constraint {self.old_name} → {self.new_name}"
112 case _:
113 return f"{self.table}: constraint {self.name} not declared"
114
115
116@dataclass
117class ForeignKeyDrift:
118 """A schema difference for a foreign key constraint."""
119
120 kind: DriftKind
121 table: str
122 name: str | None = None
123 column: str | None = None
124 target_table: str | None = None
125 target_column: str | None = None
126 on_delete_clause: str = "" # SQL clause to emit, e.g. " ON DELETE CASCADE"
127 actual_action: str | None = None # CHANGED only: current DB confdeltype
128 expected_action: str | None = None # CHANGED only: expected confdeltype
129
130 def describe(self) -> str:
131 match self.kind:
132 case DriftKind.MISSING:
133 return f"{self.table}: FK {self.name} missing ({self.column} → {self.target_table}.{self.target_column})"
134 case DriftKind.UNVALIDATED:
135 return f"{self.table}: FK {self.name} NOT VALID"
136 case DriftKind.CHANGED:
137 return (
138 f"{self.table}: FK {self.name} on_delete changed "
139 f"({self.actual_action!r} → {self.expected_action!r})"
140 )
141 case _:
142 return f"{self.table}: FK {self.name} not declared"
143
144
145@dataclass
146class NullabilityDrift:
147 """Mismatch between model and DB column nullability."""
148
149 table: str
150 column: str
151 model_allows_null: bool
152 has_null_rows: bool = False # Only checked when model_allows_null is False
153
154 def describe(self) -> str:
155 if not self.model_allows_null:
156 if self.has_null_rows:
157 return (
158 f"{self.table}: column {self.column} allows NULL (NULL rows exist)"
159 )
160 return f"{self.table}: column {self.column} allows NULL"
161 return f"{self.table}: column {self.column} is NOT NULL, model allows NULL"
162
163
164@dataclass
165class ColumnDefaultDrift:
166 """Mismatch between the model's declared default and the DB column DEFAULT."""
167
168 kind: DriftKind
169 table: str
170 column: str
171 db_default_sql: str | None
172 model_default_sql: str | None
173
174 def describe(self) -> str:
175 match self.kind:
176 case DriftKind.MISSING:
177 return (
178 f"{self.table}: column {self.column} missing DEFAULT "
179 f"(expected {self.model_default_sql})"
180 )
181 case DriftKind.CHANGED:
182 return (
183 f"{self.table}: column {self.column} DEFAULT mismatch — "
184 f"db has {self.db_default_sql}, model declares "
185 f"{self.model_default_sql}"
186 )
187 case _:
188 return (
189 f"{self.table}: column {self.column} has undeclared DEFAULT "
190 f"{self.db_default_sql}"
191 )
192
193
194@dataclass
195class StorageParameterDrift:
196 """Mismatch between declared and live `pg_class.reloptions` for a table.
197
198 `key` carries a `toast.` prefix when the parameter belongs to the table's
199 TOAST relation; convergence emits and reads it accordingly.
200 """
201
202 kind: DriftKind
203 table: str
204 key: str
205 declared_value: str | None = None
206 actual_value: str | None = None
207
208 def describe(self) -> str:
209 match self.kind:
210 case DriftKind.MISSING:
211 return (
212 f"{self.table}: storage parameter {self.key} missing "
213 f"(expected {self.declared_value})"
214 )
215 case DriftKind.CHANGED:
216 return (
217 f"{self.table}: storage parameter {self.key} mismatch — "
218 f"db has {self.actual_value}, model declares "
219 f"{self.declared_value}"
220 )
221 case _:
222 return (
223 f"{self.table}: storage parameter {self.key} not declared "
224 f"(db has {self.actual_value})"
225 )
226
227
228type ColumnDrift = NullabilityDrift | ColumnDefaultDrift
229type Drift = (
230 IndexDrift | ConstraintDrift | ForeignKeyDrift | ColumnDrift | StorageParameterDrift
231)
232
233
234# Status objects — analysis results with optional drift
235
236
237@dataclass
238class ColumnStatus:
239 name: str
240 field_name: str
241 type: str
242 nullable: bool
243 primary_key: bool
244 pk_suffix: str
245 issue: str | None = None
246 drifts: list[ColumnDrift] = field(default_factory=list)
247
248
249@dataclass
250class IndexStatus:
251 name: str
252 fields: list[str] = field(default_factory=list)
253 issue: str | None = None
254 drift: IndexDrift | None = None
255 access_method: str | None = None # set for unmanaged indexes (display only)
256
257
258@dataclass
259class ConstraintStatus:
260 name: str
261 constraint_type: ConType
262 fields: list[str] = field(default_factory=list)
263 issue: str | None = None
264 drift: ConstraintDrift | ForeignKeyDrift | IndexDrift | None = None
265
266
267@dataclass
268class ModelAnalysis:
269 label: str
270 table: str
271 table_issues: list[str] = field(default_factory=list)
272 columns: list[ColumnStatus] = field(default_factory=list)
273 indexes: list[IndexStatus] = field(default_factory=list)
274 constraints: list[ConstraintStatus] = field(default_factory=list)
275 storage_parameter_drifts: list[StorageParameterDrift] = field(default_factory=list)
276
277 @cached_property
278 def drifts(self) -> list[Drift]:
279 """All detected schema drifts."""
280 result: list[Drift] = []
281 for col in self.columns:
282 result.extend(col.drifts)
283 for idx in self.indexes:
284 if idx.drift:
285 result.append(idx.drift)
286 for con in self.constraints:
287 if con.drift:
288 result.append(con.drift)
289 result.extend(self.storage_parameter_drifts)
290 return result
291
292 @cached_property
293 def issue_count(self) -> int:
294 """Total issues (table + columns + indexes + constraints + storage)."""
295 count = len(self.table_issues)
296 count += sum(1 for col in self.columns if col.issue)
297 count += sum(1 for idx in self.indexes if idx.issue)
298 count += sum(1 for con in self.constraints if con.issue)
299 count += len(self.storage_parameter_drifts)
300 return count
301
302 def to_dict(self) -> dict[str, Any]:
303 """Serialize for --json output."""
304 return {
305 "label": self.label,
306 "table": self.table,
307 "table_issues": self.table_issues,
308 "columns": [
309 {
310 "name": col.name,
311 "field_name": col.field_name,
312 "type": col.type,
313 "nullable": col.nullable,
314 "primary_key": col.primary_key,
315 "pk_suffix": col.pk_suffix,
316 "issue": col.issue,
317 "drifts": [d.describe() for d in col.drifts],
318 }
319 for col in self.columns
320 ],
321 "indexes": [
322 {
323 "name": idx.name,
324 "fields": idx.fields,
325 "access_method": idx.access_method,
326 "issue": idx.issue,
327 "drift": idx.drift.describe() if idx.drift else None,
328 }
329 for idx in self.indexes
330 ],
331 "constraints": [
332 {
333 "name": con.name,
334 "constraint_type": con.constraint_type,
335 "type_label": con.constraint_type.label,
336 "fields": con.fields,
337 "issue": con.issue,
338 "drift": con.drift.describe() if con.drift else None,
339 }
340 for con in self.constraints
341 ],
342 "storage_parameter_drifts": [
343 {
344 "key": d.key,
345 "kind": d.kind,
346 "declared_value": d.declared_value,
347 "actual_value": d.actual_value,
348 }
349 for d in self.storage_parameter_drifts
350 ],
351 }
352
353
354def analyze_model(
355 conn: DatabaseConnection, cursor: CursorWrapper, model: type[Model]
356) -> ModelAnalysis:
357 """Compare a model against the database and classify each difference.
358
359 Introspects the actual table state, compares it against model definitions,
360 and produces a ModelAnalysis where each column/index/constraint carries its
361 issue (if any) and drift object (if schema differs).
362 """
363 table_name = model.model_options.db_table
364 db = introspect_table(conn, cursor, table_name)
365
366 if not db.exists:
367 return ModelAnalysis(
368 label=model.model_options.label,
369 table=table_name,
370 table_issues=["table missing from database"],
371 )
372
373 return ModelAnalysis(
374 label=model.model_options.label,
375 table=table_name,
376 columns=_compare_columns(model, db, table_name, cursor),
377 indexes=_compare_indexes(cursor, model, db, table_name),
378 constraints=_compare_constraints(cursor, model, db, table_name),
379 storage_parameter_drifts=_compare_storage_parameters(model, db, table_name),
380 )
381
382
383def _compare_storage_parameters(
384 model: type[Model], db: TableState, table: str
385) -> list[StorageParameterDrift]:
386 """Diff declared `model_options.storage_parameters` against `pg_class.reloptions`.
387
388 Declared keys missing from the DB → MISSING. Mismatched values → CHANGED.
389 Live keys not declared → UNDECLARED (so convergence can RESET them, keeping
390 the model as the source of truth — matches how indexes/constraints work).
391 """
392 declared = model.model_options.storage_parameters
393 actual = db.storage_parameters
394 drifts: list[StorageParameterDrift] = []
395
396 for key, declared_value in declared.items():
397 actual_value = actual.get(key)
398 if actual_value is None:
399 drifts.append(
400 StorageParameterDrift(
401 kind=DriftKind.MISSING,
402 table=table,
403 key=key,
404 declared_value=declared_value,
405 )
406 )
407 elif actual_value != declared_value:
408 drifts.append(
409 StorageParameterDrift(
410 kind=DriftKind.CHANGED,
411 table=table,
412 key=key,
413 declared_value=declared_value,
414 actual_value=actual_value,
415 )
416 )
417
418 for key, actual_value in actual.items():
419 if key not in declared:
420 drifts.append(
421 StorageParameterDrift(
422 kind=DriftKind.UNDECLARED,
423 table=table,
424 key=key,
425 actual_value=actual_value,
426 )
427 )
428
429 return drifts
430
431
432# Column comparison
433
434
435def _column_has_nulls(cursor: CursorWrapper, table: str, column: str) -> bool:
436 """Check whether any NULL values exist in a column."""
437 cursor.execute(
438 f"SELECT 1 FROM {quote_name(table)} WHERE {quote_name(column)} IS NULL LIMIT 1"
439 )
440 return cursor.fetchone() is not None
441
442
443def _compare_columns(
444 model: type[Model], db: TableState, table: str, cursor: CursorWrapper
445) -> list[ColumnStatus]:
446 statuses: list[ColumnStatus] = []
447 expected_col_names: set[str] = set()
448
449 for f in model._model_meta.local_fields:
450 if not isinstance(f, ColumnField):
451 continue
452 db_type = f.db_type()
453 if db_type is None:
454 continue
455
456 expected_col_names.add(f.column)
457 issue: str | None = None
458 drifts: list[ColumnDrift] = []
459
460 if f.column not in db.columns:
461 issue = "missing from database"
462 else:
463 actual = db.columns[f.column]
464 if db_type != actual.type:
465 issue = f"expected {db_type}, actual {actual.type}"
466 elif not f.allow_null and not actual.not_null:
467 # Model says NOT NULL, DB allows NULL — semantic drift
468 has_nulls = _column_has_nulls(cursor, table, f.column)
469 if has_nulls:
470 issue = "expected NOT NULL, actual NULL (NULL rows exist)"
471 else:
472 issue = "expected NOT NULL, actual NULL"
473 drifts.append(
474 NullabilityDrift(
475 table=table,
476 column=f.column,
477 model_allows_null=False,
478 has_null_rows=has_nulls,
479 )
480 )
481 elif f.allow_null and actual.not_null:
482 issue = "expected NULL, actual NOT NULL"
483 drifts.append(
484 NullabilityDrift(
485 table=table,
486 column=f.column,
487 model_allows_null=True,
488 )
489 )
490
491 if default_drift := _compare_column_default(cursor, f, actual, table):
492 drifts.append(default_drift)
493 if issue is None:
494 issue = default_drift.describe()
495
496 pk_suffix = ""
497 if f.primary_key:
498 pk_suffix = f.db_type_suffix() or ""
499
500 assert f.name is not None
501 statuses.append(
502 ColumnStatus(
503 name=f.column,
504 field_name=f.name,
505 type=db_type,
506 nullable=f.allow_null,
507 primary_key=f.primary_key,
508 pk_suffix=pk_suffix,
509 issue=issue,
510 drifts=drifts,
511 )
512 )
513
514 for col_name in sorted(db.columns.keys() - expected_col_names):
515 actual = db.columns[col_name]
516 statuses.append(
517 ColumnStatus(
518 name=col_name,
519 field_name="",
520 type=actual.type,
521 nullable=not actual.not_null,
522 primary_key=False,
523 pk_suffix="",
524 issue="extra column, not in model",
525 )
526 )
527
528 return statuses
529
530
531_JSONB_LITERAL_RE = re.compile(r"^\s*'((?:[^']|'')*)'::jsonb\s*$", re.IGNORECASE)
532
533
534def _extract_jsonb_literal(sql: str) -> str | None:
535 m = _JSONB_LITERAL_RE.match(sql)
536 if m is None:
537 return None
538 return m.group(1).replace("''", "'")
539
540
541def _canonicalize_default_expr(
542 cursor: CursorWrapper, model: type[Model], column: str, default_sql: str
543) -> str:
544 """Round-trip a column DEFAULT through Postgres so both sides of the
545 comparison go through pg_get_expr — the live default comes from
546 `pg_attrdef.adbin` during introspection, and the expected side is set on
547 the temp table column then read the same way.
548
549 Returns "" if the model SQL is incompatible with the live column shape;
550 comparison then sees inequality and reports drift.
551 """
552 try:
553 with _canon_table(cursor, model):
554 cursor.execute(
555 f"ALTER TABLE {_CANON_TABLE} ALTER COLUMN {quote_name(column)} "
556 f"SET DEFAULT {default_sql}"
557 )
558 cursor.execute(
559 "SELECT pg_get_expr(ad.adbin, ad.adrelid) FROM pg_attrdef ad "
560 "JOIN pg_attribute a ON a.attnum = ad.adnum AND a.attrelid = ad.adrelid "
561 "WHERE a.attrelid = (SELECT oid FROM pg_class WHERE relname = %s "
562 "AND relnamespace = pg_my_temp_schema()) "
563 "AND a.attname = %s",
564 [_CANON_TABLE, column],
565 )
566 row = cursor.fetchone()
567 return row[0] if row else ""
568 except _CANON_FALLBACK_ERRORS:
569 return ""
570
571
572def _compare_column_default(
573 cursor: CursorWrapper, field: Field, actual: ColumnState, table: str
574) -> ColumnDefaultDrift | None:
575 expected_sql: str | None = None
576 db_default_expr = field.get_db_default_expression()
577 if db_default_expr is not None:
578 expected_sql = compile_database_default_sql(db_default_expr)
579 elif field.has_persistent_literal_default():
580 expected_sql = compile_literal_default_sql(field)
581
582 if expected_sql is not None:
583 if actual.default_sql is None:
584 return ColumnDefaultDrift(
585 kind=DriftKind.MISSING,
586 table=table,
587 column=field.column,
588 db_default_sql=None,
589 model_default_sql=expected_sql,
590 )
591
592 canonical_expected = _canonicalize_default_expr(
593 cursor, field.model, field.column, expected_sql
594 )
595 if canonical_expected == actual.default_sql:
596 return None
597 # Semantic compare for jsonb — PG canonicalizes object keys, which
598 # won't match Python's dict-insertion order even after a round-trip.
599 m_json = _extract_jsonb_literal(canonical_expected)
600 d_json = _extract_jsonb_literal(actual.default_sql)
601 if m_json is not None and d_json is not None:
602 try:
603 if json.loads(m_json) == json.loads(d_json):
604 return None
605 except json.JSONDecodeError:
606 pass
607
608 return ColumnDefaultDrift(
609 kind=DriftKind.CHANGED,
610 table=table,
611 column=field.column,
612 db_default_sql=actual.default_sql,
613 model_default_sql=expected_sql,
614 )
615
616 if actual.default_sql is None:
617 return None
618
619 return ColumnDefaultDrift(
620 kind=DriftKind.UNDECLARED,
621 table=table,
622 column=field.column,
623 db_default_sql=actual.default_sql,
624 model_default_sql=None,
625 )
626
627
628# Index comparison with rename detection
629
630
631def _compare_indexes(
632 cursor: CursorWrapper, model: type[Model], db: TableState, table: str
633) -> list[IndexStatus]:
634 statuses: list[IndexStatus] = []
635 missing: list[Index] = []
636 model_index_names = {idx.name for idx in model.model_options.indexes}
637 # Unique indexes are handled by _compare_unique_constraints, not here.
638 # Also exclude indexes that back unique constraints in pg_constraint.
639 unique_constraint_names = {
640 k for k, v in db.constraints.items() if v.constraint_type == ConType.UNIQUE
641 }
642 non_unique_indexes = {
643 k: v
644 for k, v in db.indexes.items()
645 if not v.is_unique and k not in unique_constraint_names
646 }
647
648 for index in model.model_options.indexes:
649 if index.name not in non_unique_indexes:
650 missing.append(index)
651 continue
652
653 db_idx = non_unique_indexes[index.name]
654
655 # Name collision: DB has an unmanaged index type with this name
656 if db_idx.access_method not in MANAGED_INDEX_ACCESS_METHODS:
657 statuses.append(
658 IndexStatus(
659 name=index.name,
660 fields=list(index.fields),
661 issue=f"name conflict with {db_idx.access_method} index — rename one to resolve",
662 )
663 )
664 continue
665
666 if not db_idx.is_valid:
667 statuses.append(
668 IndexStatus(
669 name=index.name,
670 fields=list(index.fields),
671 issue="INVALID — needs drop and recreate",
672 drift=IndexDrift(
673 kind=DriftKind.INVALID,
674 table=table,
675 index=index,
676 model=model,
677 ),
678 )
679 )
680 continue
681
682 # Check if definition matches
683 if db_idx.definition:
684 issue = _compare_canonical_index(
685 cursor=cursor,
686 model=model,
687 expressions=index.expressions,
688 fields_orders=list(index.fields_orders),
689 opclasses=list(index.opclasses),
690 condition=index.condition,
691 include=index.include,
692 actual_def=db_idx.definition,
693 )
694 if issue:
695 statuses.append(
696 IndexStatus(
697 name=index.name,
698 fields=list(index.fields),
699 issue=issue,
700 drift=IndexDrift(
701 kind=DriftKind.CHANGED,
702 table=table,
703 index=index,
704 model=model,
705 ),
706 )
707 )
708 continue
709
710 # Index exists and matches
711 statuses.append(
712 IndexStatus(
713 name=index.name,
714 fields=list(index.fields),
715 )
716 )
717
718 # Extra indexes (in DB but not in model)
719 extra_names = sorted(non_unique_indexes.keys() - model_index_names)
720
721 # Only managed index types participate in rename detection
722 managed_extra = [
723 n
724 for n in extra_names
725 if non_unique_indexes[n].access_method in MANAGED_INDEX_ACCESS_METHODS
726 ]
727
728 # Detect renames by canonical (round-tripped) index body. Build the cheap
729 # already-introspected side first so we can skip the per-missing
730 # canonicalization loop on the steady-state path.
731 renamed_missing: set[str] = set()
732 renamed_extra: set[str] = set()
733
734 extra_by_def: dict[str, list[str]] = {}
735 for name in managed_extra:
736 defn = non_unique_indexes[name].definition
737 if defn:
738 extra_by_def.setdefault(_index_def_tail(defn), []).append(name)
739
740 missing_by_def: dict[str, list[Index]] = {}
741 if extra_by_def:
742 for index in missing:
743 expected_tail = _canonicalize_index_def(
744 cursor,
745 model,
746 expressions=index.expressions,
747 fields_orders=list(index.fields_orders),
748 opclasses=list(index.opclasses),
749 condition=index.condition,
750 include=index.include,
751 )
752 if not expected_tail:
753 # Canonicalization failed; bucketing under "" would conflate
754 # multiple sentinel-failing indexes and disable rename
755 # detection for the rest.
756 continue
757 missing_by_def.setdefault(expected_tail, []).append(index)
758
759 for tail, m_list in missing_by_def.items():
760 e_list = extra_by_def.get(tail)
761 if e_list and len(m_list) == 1 and len(e_list) == 1:
762 index = m_list[0]
763 old_name = e_list[0]
764 statuses.append(
765 IndexStatus(
766 name=index.name,
767 fields=list(index.fields),
768 issue=f"rename from {old_name}",
769 drift=IndexDrift(
770 kind=DriftKind.RENAMED,
771 table=table,
772 old_name=old_name,
773 new_name=index.name,
774 ),
775 )
776 )
777 renamed_missing.add(index.name)
778 renamed_extra.add(old_name)
779
780 # Remaining unmatched missing
781 for index in missing:
782 if index.name not in renamed_missing:
783 statuses.append(
784 IndexStatus(
785 name=index.name,
786 fields=list(index.fields),
787 issue="missing from database",
788 drift=IndexDrift(
789 kind=DriftKind.MISSING,
790 table=table,
791 index=index,
792 model=model,
793 ),
794 )
795 )
796
797 # Extra managed indexes are undeclared
798 for name in managed_extra:
799 if name not in renamed_extra:
800 statuses.append(
801 IndexStatus(
802 name=name,
803 fields=non_unique_indexes[name].columns,
804 issue="not in model",
805 drift=IndexDrift(
806 kind=DriftKind.UNDECLARED,
807 table=table,
808 name=name,
809 ),
810 )
811 )
812
813 # Extra unmanaged indexes — informational only, no drift
814 for name in extra_names:
815 idx = non_unique_indexes[name]
816 if idx.access_method not in MANAGED_INDEX_ACCESS_METHODS:
817 statuses.append(
818 IndexStatus(
819 name=name,
820 fields=idx.columns,
821 access_method=idx.access_method,
822 )
823 )
824
825 return statuses
826
827
828# Constraint comparison
829
830
831def _compare_constraints(
832 cursor: CursorWrapper, model: type[Model], db: TableState, table: str
833) -> list[ConstraintStatus]:
834 statuses: list[ConstraintStatus] = []
835 statuses.extend(_compare_unique_constraints(cursor, model, db, table))
836 statuses.extend(_compare_check_constraints(cursor, model, db, table))
837 statuses.extend(_compare_foreign_keys(model, db, table))
838
839 # Unmanaged constraint types — informational only, no drift.
840 # Primary keys are also unmanaged but not shown.
841 for name, cs in db.constraints.items():
842 if (
843 cs.constraint_type not in MANAGED_CONSTRAINT_TYPES
844 and cs.constraint_type != ConType.PRIMARY
845 ):
846 statuses.append(
847 ConstraintStatus(
848 name=name,
849 constraint_type=cs.constraint_type,
850 fields=cs.columns,
851 )
852 )
853
854 return statuses
855
856
857def _compare_unique_constraints(
858 cursor: CursorWrapper, model: type[Model], db: TableState, table: str
859) -> list[ConstraintStatus]:
860 statuses: list[ConstraintStatus] = []
861 # Unique constraints from pg_constraint (contype='u')
862 actual_constraints = {
863 k: v for k, v in db.constraints.items() if v.constraint_type == ConType.UNIQUE
864 }
865 # Unique indexes from pg_index that don't have a backing pg_constraint
866 # (e.g. partial/expression unique indexes created with CREATE UNIQUE INDEX)
867 actual_indexes = {
868 k: ConstraintState(
869 constraint_type=ConType.UNIQUE,
870 columns=v.columns,
871 validated=True,
872 definition=v.definition,
873 )
874 for k, v in db.indexes.items()
875 if v.is_unique and k not in actual_constraints
876 }
877 actual = {**actual_constraints, **actual_indexes}
878 model_constraints = [
879 c for c in model.model_options.constraints if isinstance(c, UniqueConstraint)
880 ]
881 expected_names = {c.name for c in model_constraints}
882 extra_names = sorted(actual.keys() - expected_names)
883
884 missing: list[UniqueConstraint] = []
885 for constraint in model_constraints:
886 if constraint.name not in actual:
887 missing.append(constraint)
888 continue
889
890 issue: str | None = None
891 drift: ConstraintDrift | None = None
892
893 if not actual[constraint.name].validated:
894 issue = "NOT VALID — needs validation"
895 drift = ConstraintDrift(
896 kind=DriftKind.UNVALIDATED,
897 table=table,
898 name=constraint.name,
899 )
900 elif constraint.index_only:
901 issue, drift = _compare_index_only_unique(
902 cursor, model, constraint, actual[constraint.name], table
903 )
904 elif actual_def := actual[constraint.name].definition:
905 expected_def = _get_expected_unique_definition(cursor, model, constraint)
906 # Both sides are deparsed by pg_get_constraintdef → string equality.
907 if actual_def != expected_def:
908 if expected_def:
909 issue = f"definition differs: DB has {actual_def!r}, model expects {expected_def!r}"
910 else:
911 # Round-trip canonicalization couldn't complete; canonical
912 # model text is unavailable for the diagnostic.
913 issue = f"definition differs: DB has {actual_def!r}"
914 drift = ConstraintDrift(
915 kind=DriftKind.CHANGED,
916 table=table,
917 constraint=constraint,
918 model=model,
919 )
920
921 statuses.append(
922 ConstraintStatus(
923 name=constraint.name,
924 constraint_type=ConType.UNIQUE,
925 fields=list(constraint.fields),
926 issue=issue,
927 drift=drift,
928 )
929 )
930
931 # Detect renames by columns
932 rename_statuses, renamed_missing, renamed_extra = _detect_unique_renames(
933 cursor, missing, extra_names, actual, model, table
934 )
935 statuses.extend(rename_statuses)
936
937 for constraint in missing:
938 if constraint.name not in renamed_missing:
939 statuses.append(
940 ConstraintStatus(
941 name=constraint.name,
942 constraint_type=ConType.UNIQUE,
943 fields=list(constraint.fields),
944 issue="missing from database",
945 drift=ConstraintDrift(
946 kind=DriftKind.MISSING,
947 table=table,
948 constraint=constraint,
949 model=model,
950 ),
951 )
952 )
953
954 for name in extra_names:
955 if name not in renamed_extra:
956 # Index-only entries (from pg_index, not pg_constraint) need
957 # IndexDrift so the planner uses DROP INDEX, not DROP CONSTRAINT.
958 undeclared_drift: Drift
959 if name in actual_indexes:
960 undeclared_drift = IndexDrift(
961 kind=DriftKind.UNDECLARED, table=table, name=name
962 )
963 else:
964 undeclared_drift = ConstraintDrift(
965 kind=DriftKind.UNDECLARED, table=table, name=name
966 )
967 statuses.append(
968 ConstraintStatus(
969 name=name,
970 constraint_type=ConType.UNIQUE,
971 fields=actual[name].columns,
972 issue="not in model",
973 drift=undeclared_drift,
974 )
975 )
976
977 return statuses
978
979
980def _compare_check_constraints(
981 cursor: CursorWrapper, model: type[Model], db: TableState, table: str
982) -> list[ConstraintStatus]:
983 statuses: list[ConstraintStatus] = []
984 actual = {
985 k: v for k, v in db.constraints.items() if v.constraint_type == ConType.CHECK
986 }
987 model_constraints = [
988 c for c in model.model_options.constraints if isinstance(c, CheckConstraint)
989 ]
990 expected_names = {c.name for c in model_constraints}
991 extra_names = sorted(actual.keys() - expected_names)
992
993 missing: list[CheckConstraint] = []
994 for constraint in model_constraints:
995 if constraint.name not in actual:
996 missing.append(constraint)
997 continue
998
999 issue: str | None = None
1000 drift: ConstraintDrift | None = None
1001
1002 if not actual[constraint.name].validated:
1003 issue = "NOT VALID — needs validation"
1004 drift = ConstraintDrift(
1005 kind=DriftKind.UNVALIDATED,
1006 table=table,
1007 name=constraint.name,
1008 )
1009 elif actual_def := actual[constraint.name].definition:
1010 expected_def = _get_expected_check_definition(cursor, model, constraint)
1011 # Both sides are deparsed by pg_get_constraintdef → string equality.
1012 if actual_def != expected_def:
1013 if expected_def:
1014 issue = f"definition differs: DB has {actual_def!r}, model expects {expected_def!r}"
1015 else:
1016 # Round-trip canonicalization couldn't complete; canonical
1017 # model text is unavailable for the diagnostic.
1018 issue = f"definition differs: DB has {actual_def!r}"
1019 drift = ConstraintDrift(
1020 kind=DriftKind.CHANGED,
1021 table=table,
1022 constraint=constraint,
1023 model=model,
1024 )
1025
1026 statuses.append(
1027 ConstraintStatus(
1028 name=constraint.name,
1029 constraint_type=ConType.CHECK,
1030 fields=[],
1031 issue=issue,
1032 drift=drift,
1033 )
1034 )
1035
1036 # Detect renames by definition
1037 rename_statuses, renamed_missing, renamed_extra = _detect_check_renames(
1038 cursor, missing, extra_names, actual, model, table
1039 )
1040 statuses.extend(rename_statuses)
1041
1042 for constraint in missing:
1043 if constraint.name not in renamed_missing:
1044 statuses.append(
1045 ConstraintStatus(
1046 name=constraint.name,
1047 constraint_type=ConType.CHECK,
1048 fields=[],
1049 issue="missing from database",
1050 drift=ConstraintDrift(
1051 kind=DriftKind.MISSING,
1052 table=table,
1053 constraint=constraint,
1054 model=model,
1055 ),
1056 )
1057 )
1058
1059 # Build set of framework-owned temp NOT NULL check names so leftover
1060 # artifacts from a partially-completed SetNotNullFix are silently
1061 # ignored rather than surfaced as undeclared user constraints.
1062 internal_checks = {
1063 generate_notnull_check_name(table, f.column)
1064 for f in model._model_meta.local_fields
1065 if f.db_type() is not None
1066 }
1067
1068 for name in extra_names:
1069 if name not in renamed_extra and name not in internal_checks:
1070 statuses.append(
1071 ConstraintStatus(
1072 name=name,
1073 constraint_type=ConType.CHECK,
1074 fields=actual[name].columns,
1075 issue="not in model",
1076 drift=ConstraintDrift(
1077 kind=DriftKind.UNDECLARED,
1078 table=table,
1079 name=name,
1080 ),
1081 )
1082 )
1083
1084 return statuses
1085
1086
1087def _compare_foreign_keys(
1088 model: type[Model], db: TableState, table: str
1089) -> list[ConstraintStatus]:
1090 statuses: list[ConstraintStatus] = []
1091 actual = {
1092 k: v
1093 for k, v in db.constraints.items()
1094 if v.constraint_type == ConType.FOREIGN_KEY
1095 }
1096
1097 # Build expected FKs from model fields.
1098 # Key: shape (column, target_table, target_column)
1099 # Value: (field_name, constraint_name, expected_on_delete_clause, expected_confdeltype)
1100 expected_fks: dict[tuple[str, str, str], tuple[str, str, str, str]] = {}
1101 for f in model._model_meta.local_fields:
1102 if isinstance(f, ForeignKeyField) and f.db_constraint:
1103 assert f.name is not None
1104 to_table = f.target_field.model.model_options.db_table
1105 to_column = f.target_field.column
1106 constraint_name = generate_fk_constraint_name(
1107 table, f.column, to_table, to_column
1108 )
1109 on_delete_clause, confdeltype = sql_on_delete(f.remote_field.on_delete)
1110 expected_fks[(f.column, to_table, to_column)] = (
1111 f.name,
1112 constraint_name,
1113 on_delete_clause,
1114 confdeltype,
1115 )
1116
1117 # Build actual FKs from DB: shape → (constraint_name, ConstraintState)
1118 actual_fk_by_shape: dict[tuple[str, str, str], tuple[str, ConstraintState]] = {}
1119 for name, cs in actual.items():
1120 if cs.target_table and cs.target_column and cs.columns:
1121 actual_fk_by_shape[(cs.columns[0], cs.target_table, cs.target_column)] = (
1122 name,
1123 cs,
1124 )
1125
1126 matched_fk_names: set[str] = set()
1127 for key, (
1128 field_name,
1129 constraint_name,
1130 on_delete_clause,
1131 expected_action,
1132 ) in expected_fks.items():
1133 if match := actual_fk_by_shape.get(key):
1134 actual_name, cs = match
1135 matched_fk_names.add(actual_name)
1136
1137 issue: str | None = None
1138 drift: ForeignKeyDrift | None = None
1139
1140 # on_delete action mismatch — drop + re-add with new clause
1141 if (
1142 cs.on_delete_action is not None
1143 and cs.on_delete_action != expected_action
1144 ):
1145 issue = (
1146 f"on_delete action differs "
1147 f"({cs.on_delete_action!r} → {expected_action!r})"
1148 )
1149 col, to_table, to_column = key
1150 drift = ForeignKeyDrift(
1151 kind=DriftKind.CHANGED,
1152 table=table,
1153 name=actual_name,
1154 column=col,
1155 target_table=to_table,
1156 target_column=to_column,
1157 on_delete_clause=on_delete_clause,
1158 actual_action=cs.on_delete_action,
1159 expected_action=expected_action,
1160 )
1161 elif not cs.validated:
1162 issue = "NOT VALID — needs validation"
1163 drift = ForeignKeyDrift(
1164 kind=DriftKind.UNVALIDATED,
1165 table=table,
1166 name=actual_name,
1167 )
1168
1169 statuses.append(
1170 ConstraintStatus(
1171 name=actual_name,
1172 constraint_type=ConType.FOREIGN_KEY,
1173 fields=[key[0]],
1174 issue=issue,
1175 drift=drift,
1176 )
1177 )
1178 else:
1179 col, to_table, to_column = key
1180 statuses.append(
1181 ConstraintStatus(
1182 name=f"{field_name} → {to_table}.{to_column}",
1183 constraint_type=ConType.FOREIGN_KEY,
1184 fields=[col],
1185 issue="missing from database",
1186 drift=ForeignKeyDrift(
1187 kind=DriftKind.MISSING,
1188 table=table,
1189 name=constraint_name,
1190 column=col,
1191 target_table=to_table,
1192 target_column=to_column,
1193 on_delete_clause=on_delete_clause,
1194 ),
1195 )
1196 )
1197
1198 for name in sorted(actual.keys() - matched_fk_names):
1199 cs = actual[name]
1200 statuses.append(
1201 ConstraintStatus(
1202 name=name,
1203 constraint_type=ConType.FOREIGN_KEY,
1204 fields=cs.columns,
1205 issue=f"not in model (→ {cs.target_table}.{cs.target_column})",
1206 drift=ForeignKeyDrift(
1207 kind=DriftKind.UNDECLARED,
1208 table=table,
1209 name=name,
1210 ),
1211 )
1212 )
1213
1214 return statuses
1215
1216
1217def generate_notnull_check_name(table: str, column: str) -> str:
1218 """Generate a hashed name for the temporary NOT NULL check constraint.
1219
1220 Used by SetNotNullFix for the CHECK NOT VALID → VALIDATE → SET NOT NULL
1221 pattern, and by analysis to recognize (and ignore) leftover temp checks.
1222 """
1223 from ..utils import generate_identifier_name
1224
1225 return generate_identifier_name(table, [column], "_notnull")
1226
1227
1228def generate_fk_constraint_name(
1229 table: str, column: str, target_table: str, target_column: str
1230) -> str:
1231 """Generate a deterministic FK constraint name.
1232
1233 Uses the same naming algorithm as the schema editor so that
1234 convergence-created FKs match migration-created ones.
1235 """
1236 from ..utils import generate_identifier_name, split_identifier
1237
1238 _, target_table_name = split_identifier(target_table)
1239 suffix = f"_fk_{target_table_name}_{target_column}"
1240 return generate_identifier_name(table, [column], suffix)
1241
1242
1243def _detect_unique_renames(
1244 cursor: CursorWrapper,
1245 missing: list[UniqueConstraint],
1246 extra_names: list[str],
1247 actual_dict: dict[str, ConstraintState],
1248 model: type[Model],
1249 table: str,
1250) -> tuple[list[ConstraintStatus], set[str], set[str]]:
1251 """Match missing and extra unique constraints by structure.
1252
1253 Constraint-backed (not index_only): matched by resolved column tuple.
1254 Index-only (condition/expression/opclass): matched by canonical
1255 (round-tripped) index definition, which captures the full semantics
1256 including WHERE clauses, opclasses, and expressions.
1257 """
1258 statuses: list[ConstraintStatus] = []
1259 renamed_missing: set[str] = set()
1260 renamed_extra: set[str] = set()
1261
1262 # Phase 1: Field-based — match by resolved column tuple.
1263 # Covers both constraint-backed and index-only field-based constraints.
1264 missing_by_cols: dict[tuple[str, ...], list[UniqueConstraint]] = {}
1265 for constraint in missing:
1266 if not constraint.fields:
1267 continue
1268 cols = tuple(
1269 model._model_meta.get_forward_field(field_name).column
1270 for field_name in constraint.fields
1271 )
1272 missing_by_cols.setdefault(cols, []).append(constraint)
1273
1274 extra_by_cols: dict[tuple[str, ...], list[str]] = {}
1275 for name in extra_names:
1276 cols = tuple(actual_dict[name].columns)
1277 if cols:
1278 extra_by_cols.setdefault(cols, []).append(name)
1279
1280 for cols, m_list in missing_by_cols.items():
1281 e_list = extra_by_cols.get(cols)
1282 if e_list and len(m_list) == 1 and len(e_list) == 1:
1283 constraint = m_list[0]
1284 old_name = e_list[0]
1285 # For index-only uniques, same columns isn't enough — the
1286 # condition or opclass may have changed. Verify the full
1287 # definition matches before accepting a rename, otherwise
1288 # let both sides fall through as separate missing + undeclared.
1289 if constraint.index_only:
1290 old_def = actual_dict[old_name].definition
1291 if not old_def:
1292 continue
1293 expected_tail = _canonicalize_index_def(
1294 cursor,
1295 model,
1296 expressions=constraint.expressions,
1297 fields_orders=[(f, "") for f in constraint.fields],
1298 opclasses=list(constraint.opclasses)
1299 if constraint.opclasses
1300 else [],
1301 condition=constraint.condition,
1302 include=constraint.include,
1303 unique=True,
1304 )
1305 if _index_def_tail(old_def) != expected_tail:
1306 continue
1307 DriftType = IndexDrift if constraint.index_only else ConstraintDrift
1308 statuses.append(
1309 ConstraintStatus(
1310 name=constraint.name,
1311 constraint_type=ConType.UNIQUE,
1312 fields=list(constraint.fields),
1313 issue=f"rename from {old_name}",
1314 drift=DriftType(
1315 kind=DriftKind.RENAMED,
1316 table=table,
1317 old_name=old_name,
1318 new_name=constraint.name,
1319 ),
1320 )
1321 )
1322 renamed_missing.add(constraint.name)
1323 renamed_extra.add(old_name)
1324
1325 # Phase 2: Expression-based — match by canonical (round-tripped) index
1326 # body. Build the cheap already-introspected extras side first so we can
1327 # skip the per-missing round-trip on the steady-state path.
1328 extra_by_def: dict[str, list[str]] = {}
1329 for name in extra_names:
1330 if name in renamed_extra:
1331 continue
1332 defn = actual_dict[name].definition
1333 if defn:
1334 extra_by_def.setdefault(_index_def_tail(defn), []).append(name)
1335
1336 missing_by_def: dict[str, list[UniqueConstraint]] = {}
1337 if extra_by_def:
1338 for constraint in missing:
1339 if constraint.fields or constraint.name in renamed_missing:
1340 continue
1341 # constraint.fields is empty here (filtered above), so this is
1342 # the expression-based path — fields_orders is unused.
1343 expected_tail = _canonicalize_index_def(
1344 cursor,
1345 model,
1346 expressions=constraint.expressions,
1347 opclasses=list(constraint.opclasses),
1348 condition=constraint.condition,
1349 include=constraint.include,
1350 unique=True,
1351 )
1352 if not expected_tail:
1353 # See _compare_indexes for why we skip the sentinel.
1354 continue
1355 missing_by_def.setdefault(expected_tail, []).append(constraint)
1356
1357 for tail, m_list in missing_by_def.items():
1358 e_list = extra_by_def.get(tail)
1359 if e_list and len(m_list) == 1 and len(e_list) == 1:
1360 constraint = m_list[0]
1361 old_name = e_list[0]
1362 # Index-only uniques live as indexes, not constraints, so
1363 # emit IndexDrift so the planner uses ALTER INDEX RENAME.
1364 statuses.append(
1365 ConstraintStatus(
1366 name=constraint.name,
1367 constraint_type=ConType.UNIQUE,
1368 fields=list(constraint.fields),
1369 issue=f"rename from {old_name}",
1370 drift=IndexDrift(
1371 kind=DriftKind.RENAMED,
1372 table=table,
1373 old_name=old_name,
1374 new_name=constraint.name,
1375 ),
1376 )
1377 )
1378 renamed_missing.add(constraint.name)
1379 renamed_extra.add(old_name)
1380
1381 return statuses, renamed_missing, renamed_extra
1382
1383
1384def _detect_check_renames(
1385 cursor: CursorWrapper,
1386 missing: list[CheckConstraint],
1387 extra_names: list[str],
1388 actual_dict: dict[str, ConstraintState],
1389 model: type[Model],
1390 table: str,
1391) -> tuple[list[ConstraintStatus], set[str], set[str]]:
1392 """Match missing and extra check constraints by canonical definition."""
1393 statuses: list[ConstraintStatus] = []
1394 renamed_missing: set[str] = set()
1395 renamed_extra: set[str] = set()
1396
1397 # Skip the round-trip canonicalization loop if there are no extras to
1398 # potentially match — the steady-state path with no drift.
1399 extra_by_def: dict[str, list[str]] = {}
1400 for name in extra_names:
1401 if definition := actual_dict[name].definition:
1402 extra_by_def.setdefault(definition, []).append(name)
1403
1404 missing_by_def: dict[str, list[CheckConstraint]] = {}
1405 if extra_by_def:
1406 for constraint in missing:
1407 expected_def = _get_expected_check_definition(cursor, model, constraint)
1408 if not expected_def:
1409 # Canonicalization failed; bucketing under "" would conflate
1410 # multiple sentinel-failing constraints and disable rename
1411 # detection for the rest.
1412 continue
1413 missing_by_def.setdefault(expected_def, []).append(constraint)
1414
1415 for definition, m_list in missing_by_def.items():
1416 e_list = extra_by_def.get(definition)
1417 if e_list and len(m_list) == 1 and len(e_list) == 1:
1418 constraint = m_list[0]
1419 old_name = e_list[0]
1420 statuses.append(
1421 ConstraintStatus(
1422 name=constraint.name,
1423 constraint_type=ConType.CHECK,
1424 fields=[],
1425 issue=f"rename from {old_name}",
1426 drift=ConstraintDrift(
1427 kind=DriftKind.RENAMED,
1428 table=table,
1429 old_name=old_name,
1430 new_name=constraint.name,
1431 ),
1432 )
1433 )
1434 renamed_missing.add(constraint.name)
1435 renamed_extra.add(old_name)
1436
1437 return statuses, renamed_missing, renamed_extra
1438
1439
1440def _compare_index_only_unique(
1441 cursor: CursorWrapper,
1442 model: type[Model],
1443 constraint: UniqueConstraint,
1444 actual_state: ConstraintState,
1445 table: str,
1446) -> tuple[str | None, ConstraintDrift | None]:
1447 """Compare an index-only unique constraint against the DB.
1448
1449 Index-only variants (condition, expressions, opclasses) live as unique
1450 indexes in PostgreSQL, not pg_constraint rows. Their ConstraintState
1451 comes from the pg_index query path with a pg_get_indexdef definition.
1452 """
1453 actual_def = actual_state.definition
1454 if not actual_def:
1455 return None, None
1456
1457 issue = _compare_canonical_index(
1458 cursor=cursor,
1459 model=model,
1460 expressions=constraint.expressions,
1461 fields_orders=[(f, "") for f in constraint.fields],
1462 opclasses=list(constraint.opclasses),
1463 condition=constraint.condition,
1464 include=constraint.include,
1465 actual_def=actual_def,
1466 unique=True,
1467 )
1468 if issue:
1469 changed = ConstraintDrift(
1470 kind=DriftKind.CHANGED, table=table, constraint=constraint, model=model
1471 )
1472 return issue, changed
1473
1474 return None, None
1475
1476
1477def _compare_canonical_index(
1478 *,
1479 cursor: CursorWrapper,
1480 model: type[Model],
1481 expressions: tuple[Expression | ReplaceableExpression, ...],
1482 fields_orders: list[tuple[str, str]],
1483 opclasses: list[str],
1484 condition: Q | None,
1485 include: tuple[str, ...] | None,
1486 actual_def: str,
1487 unique: bool = False,
1488) -> str | None:
1489 """Compare a model index/constraint against pg_get_indexdef text.
1490
1491 Round-trips the model side through Postgres so both sides come from
1492 pg_get_indexdef, then string-compares the canonical `USING ...` bodies.
1493
1494 Returns an issue string if definitions differ, None if they match.
1495 """
1496 expected_tail = _canonicalize_index_def(
1497 cursor,
1498 model,
1499 expressions=expressions,
1500 fields_orders=fields_orders,
1501 opclasses=opclasses,
1502 condition=condition,
1503 include=include,
1504 unique=unique,
1505 )
1506 actual_tail = _index_def_tail(actual_def)
1507 if not expected_tail:
1508 # Round-trip canonicalization couldn't complete (model SQL
1509 # incompatible with live shape). Report drift with the actual text;
1510 # the canonical model text is unavailable for the diagnostic.
1511 return f"definition differs: DB has {actual_tail!r}"
1512
1513 if actual_tail != expected_tail:
1514 return (
1515 f"definition differs: DB has {actual_tail!r}, "
1516 f"model expects {expected_tail!r}"
1517 )
1518 return None
1519
1520
1521# Round-trip canonicalization: feed model-side SQL to Postgres on a
1522# session-private temp table, read back via pg_get_*.
1523_CANON_TABLE = "_plain_canon"
1524_CANON_CONSTRAINT = "_c"
1525_CANON_INDEX = "_canon_ix"
1526
1527# Errors raised by Postgres when the model SQL is incompatible with the live
1528# table shape (unmigrated column types, references to dropped columns, etc.).
1529# Helpers catch these and return "" so drift is still reported via inequality.
1530# DataError covers literal-cast mismatches (e.g. text default on int column);
1531# NotSupportedError covers the rare PG-side "this combination isn't allowed"
1532# rejection. Both are narrow enough that catching the parent class is fine.
1533# ProgrammingError is intentionally narrowed to specific 42xxx subclasses —
1534# privilege failures (InsufficientPrivilege, also a ProgrammingError) and
1535# plain-side syntax bugs must propagate so users get a clear diagnostic
1536# instead of silent drift noise.
1537_CANON_FALLBACK_ERRORS: tuple[type[Exception], ...] = (
1538 psycopg.errors.UndefinedColumn,
1539 psycopg.errors.UndefinedFunction,
1540 psycopg.errors.UndefinedObject,
1541 psycopg.errors.UndefinedTable,
1542 psycopg.errors.InvalidColumnReference,
1543 psycopg.errors.InvalidObjectDefinition,
1544 psycopg.errors.InvalidTableDefinition,
1545 psycopg.errors.DatatypeMismatch,
1546 psycopg.errors.WrongObjectType,
1547 psycopg.errors.AmbiguousColumn,
1548 psycopg.errors.DataError,
1549 psycopg.errors.NotSupportedError,
1550)
1551
1552
1553class ReadOnlyConnectionError(RuntimeError):
1554 """Raised when convergence analysis runs on a read-only connection.
1555
1556 Analysis canonicalizes the model side of each comparison by round-tripping
1557 SQL through a session-private temp table. That requires DDL, which is
1558 rejected on standby connections and inside `read_only()` blocks.
1559 """
1560
1561
1562@contextmanager
1563def _canon_table(cursor: CursorWrapper, model: type[Model]) -> Iterator[None]:
1564 """Set up a session-private temp table mirroring the model's real table.
1565
1566 `cursor.connection.transaction()` issues a SAVEPOINT when nested (or BEGIN
1567 when run in autocommit). Either way, a model SQL statement incompatible
1568 with the live column shape (e.g. a CHECK referencing a column whose live
1569 type differs from what the model now declares) rolls back to this scope
1570 rather than poisoning the surrounding analyze transaction. Helpers catch
1571 psycopg errors and fall back to a sentinel — drift still gets reported,
1572 just without the canonical model text.
1573
1574 The trailing DROP is schema-qualified to `pg_temp` so a stray real table
1575 sharing the name (in the user's own schema) can't be hit by mistake. A
1576 single connection is always single-threaded, so reusing the name across
1577 helpers is safe.
1578 """
1579 table = quote_name(model.model_options.db_table)
1580 try:
1581 with cursor.connection.transaction():
1582 cursor.execute(f"CREATE TEMP TABLE {_CANON_TABLE} (LIKE {table})")
1583 yield
1584 cursor.execute(f"DROP TABLE pg_temp.{_CANON_TABLE}")
1585 except psycopg.errors.ReadOnlySqlTransaction as exc:
1586 raise ReadOnlyConnectionError(
1587 "Convergence analysis requires write access — it canonicalizes "
1588 "model SQL by creating a session-private temp table. The current "
1589 "connection rejected DDL (read-only transaction or standby). Run "
1590 "analysis against a primary/writable connection."
1591 ) from exc
1592
1593
1594def _canonicalize_constraint_def(
1595 cursor: CursorWrapper, model: type[Model], constraint_clause: str
1596) -> str:
1597 """Round-trip a constraint clause through Postgres so both sides of the
1598 comparison are deparsed by pg_get_constraintdef — string equality then
1599 covers `IN` vs `= ANY (ARRAY[...])`, redundant parens, type cast drift,
1600 INCLUDE column ordering, etc.
1601
1602 Returns "" if the model SQL is incompatible with the live table shape
1603 (e.g. unmigrated column-type drift). Drift still gets reported via the
1604 inequality with the actual live definition; only the canonical model
1605 text is omitted from the diagnostic.
1606 """
1607 try:
1608 with _canon_table(cursor, model):
1609 # Add as validated: the temp table is empty so the implicit scan is
1610 # instant. NOT VALID would leave a trailing " NOT VALID" suffix in
1611 # pg_get_constraintdef that the live constraint won't have.
1612 cursor.execute(
1613 f"ALTER TABLE {_CANON_TABLE} "
1614 f"ADD CONSTRAINT {_CANON_CONSTRAINT} {constraint_clause}"
1615 )
1616 cursor.execute(
1617 "SELECT pg_get_constraintdef(c.oid) FROM pg_constraint c "
1618 "WHERE c.conname = %s "
1619 "AND c.conrelid = (SELECT oid FROM pg_class WHERE relname = %s "
1620 "AND relnamespace = pg_my_temp_schema())",
1621 [_CANON_CONSTRAINT, _CANON_TABLE],
1622 )
1623 row = cursor.fetchone()
1624 return row[0] if row else ""
1625 except _CANON_FALLBACK_ERRORS:
1626 return ""
1627
1628
1629def _get_expected_check_definition(
1630 cursor: CursorWrapper, model: type[Model], constraint: CheckConstraint
1631) -> str:
1632 check_sql = compile_expression_sql(model, constraint.check)
1633 return _canonicalize_constraint_def(cursor, model, f"CHECK ({check_sql})")
1634
1635
1636def _canonicalize_index_def(
1637 cursor: CursorWrapper,
1638 model: type[Model],
1639 *,
1640 expressions: tuple[Expression | ReplaceableExpression, ...] = (),
1641 fields_orders: list[tuple[str, str]] | None = None,
1642 opclasses: list[str] | None = None,
1643 condition: Q | None = None,
1644 include: tuple[str, ...] | None = None,
1645 unique: bool = False,
1646) -> str:
1647 """Round-trip an index through Postgres and return its canonical body.
1648
1649 Returns the `USING ... [INCLUDE (...)] [WHERE (...)]` tail of pg_get_indexdef,
1650 safe to compare directly against `_index_def_tail(actual_def)` from the
1651 DB side. The `CREATE [UNIQUE] INDEX <name> ON <table>` prefix is stripped
1652 here so callers don't have to.
1653
1654 Returns "" if the model SQL is incompatible with the live table shape;
1655 comparison sites then see inequality and report drift without the
1656 canonical model text.
1657 """
1658 if expressions:
1659 columns_sql = compile_index_expressions_sql(model, expressions)
1660 else:
1661 col_parts: list[str] = []
1662 for i, (field_name, suffix) in enumerate(fields_orders or []):
1663 field = model._model_meta.get_forward_field(field_name)
1664 col = quote_name(field.column)
1665 if opclasses:
1666 col = f"{col} {opclasses[i]}"
1667 if suffix:
1668 col = f"{col} {suffix}"
1669 col_parts.append(col)
1670 columns_sql = ", ".join(col_parts)
1671
1672 where_sql = ""
1673 if condition is not None:
1674 where_sql = f" WHERE ({compile_expression_sql(model, condition)})"
1675 include_sql = build_include_sql(model, include or ())
1676 create_kw = "CREATE UNIQUE INDEX" if unique else "CREATE INDEX"
1677
1678 try:
1679 with _canon_table(cursor, model):
1680 cursor.execute(
1681 f"{create_kw} {_CANON_INDEX} ON {_CANON_TABLE} "
1682 f"({columns_sql}){include_sql}{where_sql}"
1683 )
1684 cursor.execute(
1685 "SELECT pg_get_indexdef(c.oid) FROM pg_class c "
1686 "WHERE c.relname = %s AND c.relnamespace = pg_my_temp_schema()",
1687 [_CANON_INDEX],
1688 )
1689 row = cursor.fetchone()
1690 return _index_def_tail(row[0]) if row else ""
1691 except _CANON_FALLBACK_ERRORS:
1692 return ""
1693
1694
1695def _index_def_tail(definition: str) -> str:
1696 """Strip the `CREATE [UNIQUE] INDEX <name> ON <schema>.<table>` prefix
1697 from a pg_get_indexdef output, leaving the canonical body that's safe to
1698 compare across different index names/tables."""
1699 using_pos = definition.find("USING ")
1700 return definition[using_pos:] if using_pos >= 0 else definition
1701
1702
1703def _get_expected_unique_definition(
1704 cursor: CursorWrapper, model: type[Model], constraint: UniqueConstraint
1705) -> str:
1706 """Canonical UNIQUE definition for the model's constraint, as Postgres
1707 prints it.
1708
1709 PostgreSQL only stores field-based unique constraints (with optional
1710 INCLUDE and DEFERRABLE) in pg_constraint. Expression-based, conditional,
1711 and opclass constraints remain as indexes only — those are compared via
1712 the index-definition path.
1713 """
1714 columns_sql = ", ".join(
1715 quote_name(model._model_meta.get_forward_field(f).column)
1716 for f in constraint.fields
1717 )
1718 include_sql = build_include_sql(model, constraint.include)
1719 defer_sql = deferrable_sql(constraint.deferrable)
1720 clause = f"UNIQUE ({columns_sql}){include_sql}{defer_sql}"
1721 return _canonicalize_constraint_def(cursor, model, clause)