1from __future__ import annotations
  2
  3from abc import ABC, abstractmethod
  4from dataclasses import dataclass
  5from typing import TYPE_CHECKING, ClassVar
  6
  7from ..constraints import BaseConstraint, CheckConstraint, UniqueConstraint
  8from ..db import get_connection
  9from ..dialect import quote_name
 10from ..indexes import Index
 11
 12if TYPE_CHECKING:
 13    from ..base import Model
 14
 15
 16def _execute_and_commit(sql: str) -> None:
 17    """Execute SQL and commit. Rolls back on failure so the connection stays usable."""
 18    conn = get_connection()
 19    try:
 20        with conn.cursor() as cursor:
 21            cursor.execute(sql)
 22        conn.commit()
 23    except Exception:
 24        conn.rollback()
 25        raise
 26
 27
 28def _execute_autocommit(sql: str) -> None:
 29    """Execute SQL in autocommit mode (required for CONCURRENTLY operations).
 30
 31    Commits any pending transaction first, since Postgres doesn't allow
 32    switching to autocommit while a transaction is active.
 33    """
 34    conn = get_connection()
 35    if conn.in_atomic_block:
 36        raise RuntimeError("Cannot use CONCURRENTLY inside an atomic block")
 37    old_autocommit = conn.get_autocommit()
 38    if not old_autocommit:
 39        conn.commit()
 40        conn.set_autocommit(True)
 41    try:
 42        with conn.cursor() as cursor:
 43            cursor.execute(sql)
 44    finally:
 45        if not old_autocommit:
 46            conn.set_autocommit(False)
 47
 48
 49class Fix(ABC):
 50    """Concrete executable SQL operation for convergence."""
 51
 52    pass_order: ClassVar[int]
 53
 54    @abstractmethod
 55    def describe(self) -> str: ...
 56
 57    @abstractmethod
 58    def apply(self) -> str: ...
 59
 60
 61@dataclass
 62class RebuildIndexFix(Fix):
 63    """Drop an INVALID index and recreate it CONCURRENTLY."""
 64
 65    pass_order = 0
 66
 67    table: str
 68    index: Index
 69    model: type[Model]
 70
 71    def describe(self) -> str:
 72        return f"{self.table}: rebuild index {self.index.name}"
 73
 74    def apply(self) -> str:
 75        drop_sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.index.name)}"
 76        _execute_autocommit(drop_sql)
 77        create_sql = self.index.to_sql(self.model)
 78        _execute_autocommit(create_sql)
 79        return f"{drop_sql}; {create_sql}"
 80
 81
 82@dataclass
 83class RenameIndexFix(Fix):
 84    """Rename an index (catalog-only, instant)."""
 85
 86    pass_order = 1
 87
 88    table: str
 89    old_name: str
 90    new_name: str
 91
 92    def describe(self) -> str:
 93        return f"{self.table}: rename index {self.old_name} -> {self.new_name}"
 94
 95    def apply(self) -> str:
 96        sql = f"ALTER INDEX {quote_name(self.old_name)} RENAME TO {quote_name(self.new_name)}"
 97        _execute_and_commit(sql)
 98        return sql
 99
100
101@dataclass
102class CreateIndexFix(Fix):
103    """Create a missing index using CONCURRENTLY (doesn't block writes)."""
104
105    pass_order = 1
106
107    table: str
108    index: Index
109    model: type[Model]
110
111    def describe(self) -> str:
112        return f"{self.table}: create index {self.index.name}"
113
114    def apply(self) -> str:
115        sql = self.index.to_sql(self.model)
116        _execute_autocommit(sql)
117        return sql
118
119
120@dataclass
121class AddConstraintFix(Fix):
122    """Add a missing constraint.
123
124    Check constraints use NOT VALID to avoid a table scan.
125    Unique constraints use CREATE UNIQUE INDEX CONCURRENTLY + USING INDEX
126    to avoid blocking writes.
127    """
128
129    pass_order = 2
130
131    table: str
132    constraint: BaseConstraint
133    model: type[Model]
134
135    def describe(self) -> str:
136        if isinstance(self.constraint, CheckConstraint):
137            return f"{self.table}: add constraint {self.constraint.name} (NOT VALID)"
138        return f"{self.table}: add constraint {self.constraint.name}"
139
140    def apply(self) -> str:
141        if isinstance(self.constraint, UniqueConstraint):
142            return self._apply_unique()
143        return self._apply_other()
144
145    def _apply_unique(self) -> str:
146        assert isinstance(self.constraint, UniqueConstraint)
147
148        # Step 1: Create unique index concurrently (non-blocking)
149        create_idx = self.constraint.to_sql(self.model, concurrently=True)
150        _execute_autocommit(create_idx)
151
152        # Step 2: Attach as constraint — but only for variants PostgreSQL
153        # accepts.  Partial indexes, expression indexes, and non-default
154        # operator class indexes cannot be attached as constraints; they
155        # remain as unique indexes (same enforcement, no pg_constraint row).
156        if self.constraint.index_only:
157            return create_idx
158
159        add_constraint = self.constraint.to_attach_sql(self.model)
160        try:
161            _execute_and_commit(add_constraint)
162        except Exception:
163            # Clean up the orphaned index if the constraint attachment fails
164            name = quote_name(self.constraint.name)
165            _execute_autocommit(f"DROP INDEX CONCURRENTLY IF EXISTS {name}")
166            raise
167
168        return f"{create_idx}; {add_constraint}"
169
170    def _apply_other(self) -> str:
171        if isinstance(self.constraint, CheckConstraint):
172            sql = self.constraint.to_sql(self.model, not_valid=True)
173        else:
174            sql = self.constraint.to_sql(self.model)
175        _execute_and_commit(sql)
176        return sql
177
178
179@dataclass
180class AddForeignKeyFix(Fix):
181    """Add a missing FK constraint using NOT VALID, then validate immediately.
182
183    Step 1: ADD CONSTRAINT ... NOT VALID (SHARE ROW EXCLUSIVE, no scan)
184    Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
185
186    Both steps run in a single apply() because the validate lock is weaker
187    than the add lock — there's no benefit to deferring validation.
188    """
189
190    pass_order = 2
191
192    table: str
193    constraint_name: str
194    column: str
195    target_table: str
196    target_column: str
197
198    def describe(self) -> str:
199        return f"{self.table}: add FK {self.constraint_name} ({self.column}{self.target_table}.{self.target_column})"
200
201    def apply(self) -> str:
202        add_sql = (
203            f"ALTER TABLE {quote_name(self.table)}"
204            f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
205            f" FOREIGN KEY ({quote_name(self.column)})"
206            f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
207            f" DEFERRABLE INITIALLY DEFERRED"
208            f" NOT VALID"
209        )
210        _execute_and_commit(add_sql)
211
212        validate_sql = (
213            f"ALTER TABLE {quote_name(self.table)}"
214            f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
215        )
216        _execute_and_commit(validate_sql)
217
218        return f"{add_sql}; {validate_sql}"
219
220
221@dataclass
222class SetNotNullFix(Fix):
223    """Enforce NOT NULL via CHECK NOT VALID → VALIDATE → SET NOT NULL.
224
225    A bare SET NOT NULL acquires ACCESS EXCLUSIVE and scans the whole table
226    while holding it.  With a validated IS NOT NULL check constraint already
227    in place, Postgres (12+) skips the scan and the lock is brief.
228
229    Transaction boundaries are chosen to keep lock windows narrow:
230
231      1. ADD CHECK (col IS NOT NULL) NOT VALID  — catalog-only, brief lock
232      2. VALIDATE CONSTRAINT                    — SHARE UPDATE EXCLUSIVE scan
233      3. SET NOT NULL + DROP temp check          — atomic, instant catalog ops
234
235    Steps 3+4 share a single commit so no orphaned temp check can remain
236    after the column becomes NOT NULL.  If earlier steps fail, the leftover
237    temp check is cleaned up at the start of the next run (analysis also
238    ignores framework-owned temp checks so they never block sync).
239    """
240
241    pass_order = 2
242
243    table: str
244    column: str
245
246    def describe(self) -> str:
247        return f"{self.table}: set NOT NULL on {self.column}"
248
249    def apply(self) -> str:
250        from .analysis import generate_notnull_check_name
251
252        t = quote_name(self.table)
253        c = quote_name(self.column)
254        check = quote_name(generate_notnull_check_name(self.table, self.column))
255
256        # Clean up any leftover temp constraint from a previous failed run
257        _execute_and_commit(f"ALTER TABLE {t} DROP CONSTRAINT IF EXISTS {check}")
258
259        # Step 1: Add NOT VALID check (no scan, brief lock)
260        add_sql = (
261            f"ALTER TABLE {t} ADD CONSTRAINT {check} CHECK ({c} IS NOT NULL) NOT VALID"
262        )
263        _execute_and_commit(add_sql)
264
265        # Step 2: Validate (SHARE UPDATE EXCLUSIVE — non-blocking scan)
266        validate_sql = f"ALTER TABLE {t} VALIDATE CONSTRAINT {check}"
267        _execute_and_commit(validate_sql)
268
269        # Step 3: SET NOT NULL + drop temp check in one commit.
270        # Both are instant catalog operations (SET NOT NULL skips the scan
271        # thanks to the validated check).  Combining them ensures no orphaned
272        # temp check if SET NOT NULL succeeds.
273        set_sql = f"ALTER TABLE {t} ALTER COLUMN {c} SET NOT NULL"
274        drop_sql = f"ALTER TABLE {t} DROP CONSTRAINT {check}"
275        conn = get_connection()
276        try:
277            with conn.cursor() as cursor:
278                cursor.execute(set_sql)
279                cursor.execute(drop_sql)
280            conn.commit()
281        except Exception:
282            conn.rollback()
283            raise
284
285        return f"{add_sql}; {validate_sql}; {set_sql}; {drop_sql}"
286
287
288@dataclass
289class DropNotNullFix(Fix):
290    """Remove NOT NULL from a column (model now allows NULL).
291
292    DROP NOT NULL is a catalog-only change — no data scan, instant.
293    """
294
295    pass_order = 2
296
297    table: str
298    column: str
299
300    def describe(self) -> str:
301        return f"{self.table}: drop NOT NULL on {self.column}"
302
303    def apply(self) -> str:
304        sql = f"ALTER TABLE {quote_name(self.table)} ALTER COLUMN {quote_name(self.column)} DROP NOT NULL"
305        _execute_and_commit(sql)
306        return sql
307
308
309@dataclass
310class RenameConstraintFix(Fix):
311    """Rename a constraint (catalog-only, instant).
312
313    For unique constraints, Postgres automatically renames the backing index.
314    """
315
316    pass_order = 2
317
318    table: str
319    old_name: str
320    new_name: str
321
322    def describe(self) -> str:
323        return f"{self.table}: rename constraint {self.old_name} -> {self.new_name}"
324
325    def apply(self) -> str:
326        sql = f"ALTER TABLE {quote_name(self.table)} RENAME CONSTRAINT {quote_name(self.old_name)} TO {quote_name(self.new_name)}"
327        _execute_and_commit(sql)
328        return sql
329
330
331@dataclass
332class ValidateConstraintFix(Fix):
333    """Validate a NOT VALID constraint (SHARE UPDATE EXCLUSIVE — doesn't block writes)."""
334
335    pass_order = 3
336
337    table: str
338    name: str
339
340    def describe(self) -> str:
341        return f"{self.table}: validate constraint {self.name}"
342
343    def apply(self) -> str:
344        sql = f"ALTER TABLE {quote_name(self.table)} VALIDATE CONSTRAINT {quote_name(self.name)}"
345        _execute_and_commit(sql)
346        return sql
347
348
349@dataclass
350class DropConstraintFix(Fix):
351    pass_order = 4
352
353    table: str
354    name: str
355
356    def describe(self) -> str:
357        return f"{self.table}: drop constraint {self.name}"
358
359    def apply(self) -> str:
360        sql = f"ALTER TABLE {quote_name(self.table)} DROP CONSTRAINT {quote_name(self.name)}"
361        _execute_and_commit(sql)
362        return sql
363
364
365@dataclass
366class DropIndexFix(Fix):
367    pass_order = 5
368
369    table: str
370    name: str
371
372    def describe(self) -> str:
373        return f"{self.table}: drop index {self.name}"
374
375    def apply(self) -> str:
376        sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.name)}"
377        _execute_autocommit(sql)
378        return sql