v0.150.0
  1from __future__ import annotations
  2
  3from abc import ABC, abstractmethod
  4from dataclasses import dataclass
  5from typing import TYPE_CHECKING, ClassVar
  6
  7import psycopg
  8import psycopg.sql
  9
 10from plain.logs import get_framework_logger
 11from plain.runtime import settings as plain_settings
 12
 13from ..constraints import BaseConstraint, CheckConstraint, UniqueConstraint
 14from ..db import get_connection
 15from ..dialect import build_timeout_set_clauses, quote_name
 16from ..indexes import Index
 17
 18if TYPE_CHECKING:
 19    from ..base import Model
 20
 21
 22logger = get_framework_logger()
 23
 24
 25def _convergence_prelude(*, blocking: bool, local: bool) -> str:
 26    """Return the SET prelude for convergence DDL.
 27
 28    `blocking=True` (ACCESS EXCLUSIVE) sets both lock_timeout and
 29    statement_timeout. `blocking=False` (SHARE UPDATE EXCLUSIVE — VALIDATE,
 30    CONCURRENTLY) omits statement_timeout so the non-blocking statement can
 31    run to completion on any table size.
 32    """
 33    return build_timeout_set_clauses(
 34        lock_timeout=plain_settings.POSTGRES_CONVERGENCE_LOCK_TIMEOUT,
 35        statement_timeout=(
 36            plain_settings.POSTGRES_CONVERGENCE_STATEMENT_TIMEOUT if blocking else None
 37        ),
 38        local=local,
 39    )
 40
 41
 42def _execute_and_commit(sql: str | list[str], *, blocking: bool = True) -> None:
 43    """Execute DDL in a committed transaction with convergence timeouts.
 44
 45    Accepts a single SQL string or a list of statements that must share one
 46    transaction (e.g. SetNotNullFix step 3: SET NOT NULL + DROP temp check
 47    together so no orphan constraint can remain after a partial failure).
 48    """
 49    prelude = _convergence_prelude(blocking=blocking, local=True)
 50    if isinstance(sql, str):
 51        script = prelude + sql
 52    else:
 53        script = prelude + "; ".join(sql)
 54
 55    # psycopg3 simple-query protocol: a single execute() with a multi-statement
 56    # script runs every statement in one transaction. params must be None —
 57    # these are literal DDL strings assembled above.
 58    conn = get_connection()
 59    try:
 60        with conn.cursor() as cursor:
 61            cursor.execute(script)
 62        conn.commit()
 63    except Exception:
 64        conn.rollback()
 65        raise
 66
 67
 68def _execute_autocommit(sql: str) -> None:
 69    """Execute DDL in autocommit mode (required for CONCURRENTLY operations).
 70
 71    CONCURRENTLY holds SHARE UPDATE EXCLUSIVE (non-blocking). Only
 72    lock_timeout is set — statement_timeout is omitted so the operation can
 73    run to completion on any table size.
 74
 75    Autocommit forbids SET LOCAL, and bundling SET with CONCURRENTLY in a
 76    single query forms an implicit transaction block that CONCURRENTLY
 77    rejects. So SET, DDL, and RESET are three separate cursor.execute()
 78    calls. RESET runs in a finally so the session-level timeout doesn't leak
 79    to the next caller even if the DDL fails.
 80    """
 81    conn = get_connection()
 82    if conn.in_atomic_block:
 83        raise RuntimeError("Cannot use CONCURRENTLY inside an atomic block")
 84    old_autocommit = conn.get_autocommit()
 85    if not old_autocommit:
 86        conn.commit()
 87        conn.set_autocommit(True)
 88    try:
 89        # Session-level SET (no LOCAL — autocommit has no transaction to
 90        # scope to). Routed through the dialect helper so the setting value
 91        # is validated the same way as every other timeout SET.
 92        set_prelude = build_timeout_set_clauses(
 93            lock_timeout=plain_settings.POSTGRES_CONVERGENCE_LOCK_TIMEOUT,
 94            statement_timeout=None,
 95            local=False,
 96        )
 97        try:
 98            with conn.cursor() as cursor:
 99                cursor.execute(set_prelude)
100                cursor.execute(sql)
101        finally:
102            # Clear the session-level timeouts. Both are RESET even though
103            # only lock_timeout is set today, so a future change that
104            # introduces statement_timeout on this path can't silently leak
105            # it to the next caller. RESET of an unset value is a no-op. If
106            # RESET itself fails, close the connection so the pool can't
107            # hand it back with leaked session state.
108            try:
109                with conn.cursor() as cursor:
110                    cursor.execute("RESET lock_timeout; RESET statement_timeout")
111            except Exception:
112                logger.warning(
113                    "Failed to RESET session timeouts after autocommit DDL; "
114                    "closing connection to avoid leaking session state",
115                    exc_info=True,
116                )
117                conn.close()
118    finally:
119        # Restore autocommit only if the connection is still open — if we
120        # closed it above due to RESET failure, calling set_autocommit would
121        # force a reconnect just to flip a flag, and any failure there would
122        # mask the original DDL error.
123        if not old_autocommit and conn.connection is not None:
124            conn.set_autocommit(False)
125
126
127class Fix(ABC):
128    """Concrete executable SQL operation for convergence."""
129
130    pass_order: ClassVar[int]
131
132    @abstractmethod
133    def describe(self) -> str: ...
134
135    @abstractmethod
136    def apply(self) -> str: ...
137
138
139@dataclass
140class RebuildIndexFix(Fix):
141    """Drop an INVALID index and recreate it CONCURRENTLY."""
142
143    pass_order = 0
144
145    table: str
146    index: Index
147    model: type[Model]
148
149    def describe(self) -> str:
150        return f"{self.table}: rebuild index {self.index.name}"
151
152    def apply(self) -> str:
153        drop_sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.index.name)}"
154        _execute_autocommit(drop_sql)
155        create_sql = self.index.to_sql(self.model)
156        _execute_autocommit(create_sql)
157        return f"{drop_sql}; {create_sql}"
158
159
160@dataclass
161class RenameIndexFix(Fix):
162    """Rename an index (catalog-only, instant)."""
163
164    pass_order = 1
165
166    table: str
167    old_name: str
168    new_name: str
169
170    def describe(self) -> str:
171        return f"{self.table}: rename index {self.old_name} -> {self.new_name}"
172
173    def apply(self) -> str:
174        sql = f"ALTER INDEX {quote_name(self.old_name)} RENAME TO {quote_name(self.new_name)}"
175        _execute_and_commit(sql)
176        return sql
177
178
179@dataclass
180class CreateIndexFix(Fix):
181    """Create a missing index using CONCURRENTLY (doesn't block writes)."""
182
183    pass_order = 1
184
185    table: str
186    index: Index
187    model: type[Model]
188
189    def describe(self) -> str:
190        return f"{self.table}: create index {self.index.name}"
191
192    def apply(self) -> str:
193        sql = self.index.to_sql(self.model)
194        _execute_autocommit(sql)
195        return sql
196
197
198@dataclass
199class AddConstraintFix(Fix):
200    """Add a missing constraint.
201
202    Check constraints use ADD CONSTRAINT ... NOT VALID + VALIDATE CONSTRAINT
203    in a single apply() — the add is catalog-only (brief lock) and the
204    validate uses SHARE UPDATE EXCLUSIVE which doesn't block writes, so
205    there's no benefit to deferring validation to a later run.
206    Unique constraints use CREATE UNIQUE INDEX CONCURRENTLY + USING INDEX
207    to avoid blocking writes.
208    """
209
210    pass_order = 2
211
212    table: str
213    constraint: BaseConstraint
214    model: type[Model]
215
216    def describe(self) -> str:
217        return f"{self.table}: add constraint {self.constraint.name}"
218
219    def apply(self) -> str:
220        if isinstance(self.constraint, UniqueConstraint):
221            return self._apply_unique()
222        return self._apply_other()
223
224    def _apply_unique(self) -> str:
225        assert isinstance(self.constraint, UniqueConstraint)
226
227        # Step 1: Create unique index concurrently (non-blocking)
228        create_idx = self.constraint.to_sql(self.model, concurrently=True)
229        _execute_autocommit(create_idx)
230
231        # Step 2: Attach as constraint — but only for variants PostgreSQL
232        # accepts.  Partial indexes, expression indexes, and non-default
233        # operator class indexes cannot be attached as constraints; they
234        # remain as unique indexes (same enforcement, no pg_constraint row).
235        if self.constraint.index_only:
236            return create_idx
237
238        add_constraint = self.constraint.to_attach_sql(self.model)
239        try:
240            _execute_and_commit(add_constraint)
241        except Exception:
242            # Clean up the orphaned index if the constraint attachment fails
243            name = quote_name(self.constraint.name)
244            _execute_autocommit(f"DROP INDEX CONCURRENTLY IF EXISTS {name}")
245            raise
246
247        return f"{create_idx}; {add_constraint}"
248
249    def _apply_other(self) -> str:
250        if isinstance(self.constraint, CheckConstraint):
251            add_sql = self.constraint.to_sql(self.model, not_valid=True)
252            _execute_and_commit(add_sql)
253
254            validate_sql = (
255                f"ALTER TABLE {quote_name(self.table)}"
256                f" VALIDATE CONSTRAINT {quote_name(self.constraint.name)}"
257            )
258            _execute_and_commit(validate_sql, blocking=False)
259
260            return f"{add_sql}; {validate_sql}"
261
262        sql = self.constraint.to_sql(self.model)
263        _execute_and_commit(sql)
264        return sql
265
266
267@dataclass
268class AddForeignKeyFix(Fix):
269    """Add a missing FK constraint using NOT VALID, then validate immediately.
270
271    Step 1: ADD CONSTRAINT ... NOT VALID (SHARE ROW EXCLUSIVE, no scan)
272    Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
273
274    Both steps run in a single apply() because the validate lock is weaker
275    than the add lock — there's no benefit to deferring validation.
276    """
277
278    pass_order = 2
279
280    table: str
281    constraint_name: str
282    column: str
283    target_table: str
284    target_column: str
285    on_delete_clause: str = ""  # e.g. " ON DELETE CASCADE" or "" for NO ACTION
286
287    def describe(self) -> str:
288        return f"{self.table}: add FK {self.constraint_name} ({self.column}{self.target_table}.{self.target_column})"
289
290    def apply(self) -> str:
291        add_sql = (
292            f"ALTER TABLE {quote_name(self.table)}"
293            f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
294            f" FOREIGN KEY ({quote_name(self.column)})"
295            f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
296            f"{self.on_delete_clause}"
297            f" DEFERRABLE INITIALLY DEFERRED"
298            f" NOT VALID"
299        )
300        _execute_and_commit(add_sql)
301
302        validate_sql = (
303            f"ALTER TABLE {quote_name(self.table)}"
304            f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
305        )
306        _execute_and_commit(validate_sql, blocking=False)
307
308        return f"{add_sql}; {validate_sql}"
309
310
311@dataclass
312class ReplaceForeignKeyFix(Fix):
313    """Swap a FK's ON DELETE action, then validate.
314
315    Step 1: ALTER TABLE DROP CONSTRAINT + ADD CONSTRAINT ... NOT VALID
316            (ACCESS EXCLUSIVE on the referenced table's DROP, but the
317            statement is catalog-only — no table scan)
318    Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
319
320    Between steps the constraint exists as NOT VALID, which still enforces
321    on new inserts/updates — so there is no window of unsafe writes. The
322    DROP and ADD share a single ALTER TABLE statement so the old
323    constraint is never absent in the catalog.
324
325    VALIDATE still scans the table, but under a weaker lock than a naive
326    DROP + ADD would take for the validation scan. Existing rows were
327    already valid under the previous constraint (on_delete action doesn't
328    affect what is validated — only what happens at delete time), so
329    the scan will pass unless the data was corrupted out-of-band.
330    """
331
332    pass_order = 2
333
334    table: str
335    constraint_name: str
336    column: str
337    target_table: str
338    target_column: str
339    on_delete_clause: str
340
341    def describe(self) -> str:
342        return f"{self.table}: update FK {self.constraint_name} on_delete"
343
344    def apply(self) -> str:
345        replace_sql = (
346            f"ALTER TABLE {quote_name(self.table)}"
347            f" DROP CONSTRAINT {quote_name(self.constraint_name)},"
348            f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
349            f" FOREIGN KEY ({quote_name(self.column)})"
350            f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
351            f"{self.on_delete_clause}"
352            f" DEFERRABLE INITIALLY DEFERRED"
353            f" NOT VALID"
354        )
355        _execute_and_commit(replace_sql)
356
357        validate_sql = (
358            f"ALTER TABLE {quote_name(self.table)}"
359            f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
360        )
361        _execute_and_commit(validate_sql, blocking=False)
362
363        return f"{replace_sql}; {validate_sql}"
364
365
366@dataclass
367class SetNotNullFix(Fix):
368    """Enforce NOT NULL via CHECK NOT VALID → VALIDATE → SET NOT NULL.
369
370    A bare SET NOT NULL acquires ACCESS EXCLUSIVE and scans the whole table
371    while holding it.  With a validated IS NOT NULL check constraint already
372    in place, Postgres (12+) skips the scan and the lock is brief.
373
374    Transaction boundaries are chosen to keep lock windows narrow:
375
376      1. ADD CHECK (col IS NOT NULL) NOT VALID  — catalog-only, brief lock
377      2. VALIDATE CONSTRAINT                    — SHARE UPDATE EXCLUSIVE scan
378      3. SET NOT NULL + DROP temp check          — atomic, instant catalog ops
379
380    Steps 3+4 share a single commit so no orphaned temp check can remain
381    after the column becomes NOT NULL.  If earlier steps fail, the leftover
382    temp check is cleaned up at the start of the next run (analysis also
383    ignores framework-owned temp checks so they never block sync).
384    """
385
386    pass_order = 2
387
388    table: str
389    column: str
390
391    def describe(self) -> str:
392        return f"{self.table}: set NOT NULL on {self.column}"
393
394    def apply(self) -> str:
395        from .analysis import generate_notnull_check_name
396
397        t = quote_name(self.table)
398        c = quote_name(self.column)
399        check = quote_name(generate_notnull_check_name(self.table, self.column))
400
401        # Clean up any leftover temp constraint from a previous failed run
402        _execute_and_commit(f"ALTER TABLE {t} DROP CONSTRAINT IF EXISTS {check}")
403
404        # Step 1: Add NOT VALID check (no scan, brief lock)
405        add_sql = (
406            f"ALTER TABLE {t} ADD CONSTRAINT {check} CHECK ({c} IS NOT NULL) NOT VALID"
407        )
408        _execute_and_commit(add_sql)
409
410        # Step 2: Validate (SHARE UPDATE EXCLUSIVE — non-blocking scan)
411        validate_sql = f"ALTER TABLE {t} VALIDATE CONSTRAINT {check}"
412        _execute_and_commit(validate_sql, blocking=False)
413
414        # Step 3: SET NOT NULL + drop temp check in one commit.
415        # Both are instant catalog operations (SET NOT NULL skips the scan
416        # thanks to the validated check).  Combining them ensures no orphaned
417        # temp check if SET NOT NULL succeeds.
418        set_sql = f"ALTER TABLE {t} ALTER COLUMN {c} SET NOT NULL"
419        drop_sql = f"ALTER TABLE {t} DROP CONSTRAINT {check}"
420        _execute_and_commit([set_sql, drop_sql])
421
422        return f"{add_sql}; {validate_sql}; {set_sql}; {drop_sql}"
423
424
425@dataclass
426class DropNotNullFix(Fix):
427    """Remove NOT NULL from a column (model now allows NULL).
428
429    DROP NOT NULL is a catalog-only change — no data scan, instant.
430    """
431
432    pass_order = 2
433
434    table: str
435    column: str
436
437    def describe(self) -> str:
438        return f"{self.table}: drop NOT NULL on {self.column}"
439
440    def apply(self) -> str:
441        sql = f"ALTER TABLE {quote_name(self.table)} ALTER COLUMN {quote_name(self.column)} DROP NOT NULL"
442        _execute_and_commit(sql)
443        return sql
444
445
446@dataclass
447class SetColumnDefaultFix(Fix):
448    """Set (or replace) a column's DEFAULT (catalog-only, instant)."""
449
450    pass_order = 2
451
452    table: str
453    column: str
454    default_sql: str
455
456    def describe(self) -> str:
457        return f"{self.table}: set DEFAULT {self.default_sql} on {self.column}"
458
459    def apply(self) -> str:
460        sql = (
461            f"ALTER TABLE {quote_name(self.table)}"
462            f" ALTER COLUMN {quote_name(self.column)}"
463            f" SET DEFAULT {self.default_sql}"
464        )
465        _execute_and_commit(sql)
466        return sql
467
468
469@dataclass
470class DropColumnDefaultFix(Fix):
471    """Drop a column's DEFAULT (catalog-only, instant)."""
472
473    pass_order = 2
474
475    table: str
476    column: str
477
478    def describe(self) -> str:
479        return f"{self.table}: drop DEFAULT on {self.column}"
480
481    def apply(self) -> str:
482        sql = (
483            f"ALTER TABLE {quote_name(self.table)}"
484            f" ALTER COLUMN {quote_name(self.column)}"
485            f" DROP DEFAULT"
486        )
487        _execute_and_commit(sql)
488        return sql
489
490
491@dataclass
492class RenameConstraintFix(Fix):
493    """Rename a constraint (catalog-only, instant).
494
495    For unique constraints, Postgres automatically renames the backing index.
496    """
497
498    pass_order = 2
499
500    table: str
501    old_name: str
502    new_name: str
503
504    def describe(self) -> str:
505        return f"{self.table}: rename constraint {self.old_name} -> {self.new_name}"
506
507    def apply(self) -> str:
508        sql = f"ALTER TABLE {quote_name(self.table)} RENAME CONSTRAINT {quote_name(self.old_name)} TO {quote_name(self.new_name)}"
509        _execute_and_commit(sql)
510        return sql
511
512
513@dataclass
514class ValidateConstraintFix(Fix):
515    """Validate a NOT VALID constraint (SHARE UPDATE EXCLUSIVE — doesn't block writes)."""
516
517    pass_order = 3
518
519    table: str
520    name: str
521
522    def describe(self) -> str:
523        return f"{self.table}: validate constraint {self.name}"
524
525    def apply(self) -> str:
526        sql = f"ALTER TABLE {quote_name(self.table)} VALIDATE CONSTRAINT {quote_name(self.name)}"
527        _execute_and_commit(sql, blocking=False)
528        return sql
529
530
531@dataclass
532class DropConstraintFix(Fix):
533    pass_order = 4
534
535    table: str
536    name: str
537
538    def describe(self) -> str:
539        return f"{self.table}: drop constraint {self.name}"
540
541    def apply(self) -> str:
542        sql = f"ALTER TABLE {quote_name(self.table)} DROP CONSTRAINT {quote_name(self.name)}"
543        _execute_and_commit(sql)
544        return sql
545
546
547@dataclass
548class DropIndexFix(Fix):
549    pass_order = 5
550
551    table: str
552    name: str
553
554    def describe(self) -> str:
555        return f"{self.table}: drop index {self.name}"
556
557    def apply(self) -> str:
558        sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.name)}"
559        _execute_autocommit(sql)
560        return sql
561
562
563@dataclass
564class SetStorageParameterFix(Fix):
565    """Set a single `pg_class.reloptions` parameter (catalog-only, instant)."""
566
567    pass_order = 2
568
569    table: str
570    key: str
571    value: str
572
573    def describe(self) -> str:
574        return f"{self.table}: set storage parameter {self.key} = {self.value}"
575
576    def apply(self) -> str:
577        conn = get_connection()
578        quoted = psycopg.sql.quote(self.value, conn.connection)
579        sql = f"ALTER TABLE {quote_name(self.table)} SET ({self.key} = {quoted})"
580        _execute_and_commit(sql)
581        return sql
582
583
584@dataclass
585class ResetStorageParameterFix(Fix):
586    """Reset a single `pg_class.reloptions` parameter (catalog-only, instant)."""
587
588    pass_order = 2
589
590    table: str
591    key: str
592
593    def describe(self) -> str:
594        return f"{self.table}: reset storage parameter {self.key}"
595
596    def apply(self) -> str:
597        sql = f"ALTER TABLE {quote_name(self.table)} RESET ({self.key})"
598        _execute_and_commit(sql)
599        return sql