1from __future__ import annotations
2
3from abc import ABC, abstractmethod
4from dataclasses import dataclass
5from typing import TYPE_CHECKING, ClassVar
6
7import psycopg
8import psycopg.sql
9
10from plain.logs import get_framework_logger
11from plain.runtime import settings as plain_settings
12
13from ..constraints import BaseConstraint, CheckConstraint, UniqueConstraint
14from ..db import get_connection
15from ..dialect import build_timeout_set_clauses, quote_name
16from ..indexes import Index
17
18if TYPE_CHECKING:
19 from ..base import Model
20
21
22logger = get_framework_logger()
23
24
25def _convergence_prelude(*, blocking: bool, local: bool) -> str:
26 """Return the SET prelude for convergence DDL.
27
28 `blocking=True` (ACCESS EXCLUSIVE) sets both lock_timeout and
29 statement_timeout. `blocking=False` (SHARE UPDATE EXCLUSIVE — VALIDATE,
30 CONCURRENTLY) omits statement_timeout so the non-blocking statement can
31 run to completion on any table size.
32 """
33 return build_timeout_set_clauses(
34 lock_timeout=plain_settings.POSTGRES_CONVERGENCE_LOCK_TIMEOUT,
35 statement_timeout=(
36 plain_settings.POSTGRES_CONVERGENCE_STATEMENT_TIMEOUT if blocking else None
37 ),
38 local=local,
39 )
40
41
42def _execute_and_commit(sql: str | list[str], *, blocking: bool = True) -> None:
43 """Execute DDL in a committed transaction with convergence timeouts.
44
45 Accepts a single SQL string or a list of statements that must share one
46 transaction (e.g. SetNotNullFix step 3: SET NOT NULL + DROP temp check
47 together so no orphan constraint can remain after a partial failure).
48 """
49 prelude = _convergence_prelude(blocking=blocking, local=True)
50 if isinstance(sql, str):
51 script = prelude + sql
52 else:
53 script = prelude + "; ".join(sql)
54
55 # psycopg3 simple-query protocol: a single execute() with a multi-statement
56 # script runs every statement in one transaction. params must be None —
57 # these are literal DDL strings assembled above.
58 conn = get_connection()
59 try:
60 with conn.cursor() as cursor:
61 cursor.execute(script)
62 conn.commit()
63 except Exception:
64 conn.rollback()
65 raise
66
67
68def _execute_autocommit(sql: str) -> None:
69 """Execute DDL in autocommit mode (required for CONCURRENTLY operations).
70
71 CONCURRENTLY holds SHARE UPDATE EXCLUSIVE (non-blocking). Only
72 lock_timeout is set — statement_timeout is omitted so the operation can
73 run to completion on any table size.
74
75 Autocommit forbids SET LOCAL, and bundling SET with CONCURRENTLY in a
76 single query forms an implicit transaction block that CONCURRENTLY
77 rejects. So SET, DDL, and RESET are three separate cursor.execute()
78 calls. RESET runs in a finally so the session-level timeout doesn't leak
79 to the next caller even if the DDL fails.
80 """
81 conn = get_connection()
82 if conn.in_atomic_block:
83 raise RuntimeError("Cannot use CONCURRENTLY inside an atomic block")
84 old_autocommit = conn.get_autocommit()
85 if not old_autocommit:
86 conn.commit()
87 conn.set_autocommit(True)
88 try:
89 # Session-level SET (no LOCAL — autocommit has no transaction to
90 # scope to). Routed through the dialect helper so the setting value
91 # is validated the same way as every other timeout SET.
92 set_prelude = build_timeout_set_clauses(
93 lock_timeout=plain_settings.POSTGRES_CONVERGENCE_LOCK_TIMEOUT,
94 statement_timeout=None,
95 local=False,
96 )
97 try:
98 with conn.cursor() as cursor:
99 cursor.execute(set_prelude)
100 cursor.execute(sql)
101 finally:
102 # Clear the session-level timeouts. Both are RESET even though
103 # only lock_timeout is set today, so a future change that
104 # introduces statement_timeout on this path can't silently leak
105 # it to the next caller. RESET of an unset value is a no-op. If
106 # RESET itself fails, close the connection so the pool can't
107 # hand it back with leaked session state.
108 try:
109 with conn.cursor() as cursor:
110 cursor.execute("RESET lock_timeout; RESET statement_timeout")
111 except Exception:
112 logger.warning(
113 "Failed to RESET session timeouts after autocommit DDL; "
114 "closing connection to avoid leaking session state",
115 exc_info=True,
116 )
117 conn.close()
118 finally:
119 # Restore autocommit only if the connection is still open — if we
120 # closed it above due to RESET failure, calling set_autocommit would
121 # force a reconnect just to flip a flag, and any failure there would
122 # mask the original DDL error.
123 if not old_autocommit and conn.connection is not None:
124 conn.set_autocommit(False)
125
126
127class Fix(ABC):
128 """Concrete executable SQL operation for convergence."""
129
130 pass_order: ClassVar[int]
131
132 @abstractmethod
133 def describe(self) -> str: ...
134
135 @abstractmethod
136 def apply(self) -> str: ...
137
138
139@dataclass
140class RebuildIndexFix(Fix):
141 """Drop an INVALID index and recreate it CONCURRENTLY."""
142
143 pass_order = 0
144
145 table: str
146 index: Index
147 model: type[Model]
148
149 def describe(self) -> str:
150 return f"{self.table}: rebuild index {self.index.name}"
151
152 def apply(self) -> str:
153 drop_sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.index.name)}"
154 _execute_autocommit(drop_sql)
155 create_sql = self.index.to_sql(self.model)
156 _execute_autocommit(create_sql)
157 return f"{drop_sql}; {create_sql}"
158
159
160@dataclass
161class RenameIndexFix(Fix):
162 """Rename an index (catalog-only, instant)."""
163
164 pass_order = 1
165
166 table: str
167 old_name: str
168 new_name: str
169
170 def describe(self) -> str:
171 return f"{self.table}: rename index {self.old_name} -> {self.new_name}"
172
173 def apply(self) -> str:
174 sql = f"ALTER INDEX {quote_name(self.old_name)} RENAME TO {quote_name(self.new_name)}"
175 _execute_and_commit(sql)
176 return sql
177
178
179@dataclass
180class CreateIndexFix(Fix):
181 """Create a missing index using CONCURRENTLY (doesn't block writes)."""
182
183 pass_order = 1
184
185 table: str
186 index: Index
187 model: type[Model]
188
189 def describe(self) -> str:
190 return f"{self.table}: create index {self.index.name}"
191
192 def apply(self) -> str:
193 sql = self.index.to_sql(self.model)
194 _execute_autocommit(sql)
195 return sql
196
197
198@dataclass
199class AddConstraintFix(Fix):
200 """Add a missing constraint.
201
202 Check constraints use ADD CONSTRAINT ... NOT VALID + VALIDATE CONSTRAINT
203 in a single apply() — the add is catalog-only (brief lock) and the
204 validate uses SHARE UPDATE EXCLUSIVE which doesn't block writes, so
205 there's no benefit to deferring validation to a later run.
206 Unique constraints use CREATE UNIQUE INDEX CONCURRENTLY + USING INDEX
207 to avoid blocking writes.
208 """
209
210 pass_order = 2
211
212 table: str
213 constraint: BaseConstraint
214 model: type[Model]
215
216 def describe(self) -> str:
217 return f"{self.table}: add constraint {self.constraint.name}"
218
219 def apply(self) -> str:
220 if isinstance(self.constraint, UniqueConstraint):
221 return self._apply_unique()
222 return self._apply_other()
223
224 def _apply_unique(self) -> str:
225 assert isinstance(self.constraint, UniqueConstraint)
226
227 # Step 1: Create unique index concurrently (non-blocking)
228 create_idx = self.constraint.to_sql(self.model, concurrently=True)
229 _execute_autocommit(create_idx)
230
231 # Step 2: Attach as constraint — but only for variants PostgreSQL
232 # accepts. Partial indexes, expression indexes, and non-default
233 # operator class indexes cannot be attached as constraints; they
234 # remain as unique indexes (same enforcement, no pg_constraint row).
235 if self.constraint.index_only:
236 return create_idx
237
238 add_constraint = self.constraint.to_attach_sql(self.model)
239 try:
240 _execute_and_commit(add_constraint)
241 except Exception:
242 # Clean up the orphaned index if the constraint attachment fails
243 name = quote_name(self.constraint.name)
244 _execute_autocommit(f"DROP INDEX CONCURRENTLY IF EXISTS {name}")
245 raise
246
247 return f"{create_idx}; {add_constraint}"
248
249 def _apply_other(self) -> str:
250 if isinstance(self.constraint, CheckConstraint):
251 add_sql = self.constraint.to_sql(self.model, not_valid=True)
252 _execute_and_commit(add_sql)
253
254 validate_sql = (
255 f"ALTER TABLE {quote_name(self.table)}"
256 f" VALIDATE CONSTRAINT {quote_name(self.constraint.name)}"
257 )
258 _execute_and_commit(validate_sql, blocking=False)
259
260 return f"{add_sql}; {validate_sql}"
261
262 sql = self.constraint.to_sql(self.model)
263 _execute_and_commit(sql)
264 return sql
265
266
267@dataclass
268class AddForeignKeyFix(Fix):
269 """Add a missing FK constraint using NOT VALID, then validate immediately.
270
271 Step 1: ADD CONSTRAINT ... NOT VALID (SHARE ROW EXCLUSIVE, no scan)
272 Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
273
274 Both steps run in a single apply() because the validate lock is weaker
275 than the add lock — there's no benefit to deferring validation.
276 """
277
278 pass_order = 2
279
280 table: str
281 constraint_name: str
282 column: str
283 target_table: str
284 target_column: str
285 on_delete_clause: str = "" # e.g. " ON DELETE CASCADE" or "" for NO ACTION
286
287 def describe(self) -> str:
288 return f"{self.table}: add FK {self.constraint_name} ({self.column} → {self.target_table}.{self.target_column})"
289
290 def apply(self) -> str:
291 add_sql = (
292 f"ALTER TABLE {quote_name(self.table)}"
293 f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
294 f" FOREIGN KEY ({quote_name(self.column)})"
295 f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
296 f"{self.on_delete_clause}"
297 f" DEFERRABLE INITIALLY DEFERRED"
298 f" NOT VALID"
299 )
300 _execute_and_commit(add_sql)
301
302 validate_sql = (
303 f"ALTER TABLE {quote_name(self.table)}"
304 f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
305 )
306 _execute_and_commit(validate_sql, blocking=False)
307
308 return f"{add_sql}; {validate_sql}"
309
310
311@dataclass
312class ReplaceForeignKeyFix(Fix):
313 """Swap a FK's ON DELETE action, then validate.
314
315 Step 1: ALTER TABLE DROP CONSTRAINT + ADD CONSTRAINT ... NOT VALID
316 (ACCESS EXCLUSIVE on the referenced table's DROP, but the
317 statement is catalog-only — no table scan)
318 Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
319
320 Between steps the constraint exists as NOT VALID, which still enforces
321 on new inserts/updates — so there is no window of unsafe writes. The
322 DROP and ADD share a single ALTER TABLE statement so the old
323 constraint is never absent in the catalog.
324
325 VALIDATE still scans the table, but under a weaker lock than a naive
326 DROP + ADD would take for the validation scan. Existing rows were
327 already valid under the previous constraint (on_delete action doesn't
328 affect what is validated — only what happens at delete time), so
329 the scan will pass unless the data was corrupted out-of-band.
330 """
331
332 pass_order = 2
333
334 table: str
335 constraint_name: str
336 column: str
337 target_table: str
338 target_column: str
339 on_delete_clause: str
340
341 def describe(self) -> str:
342 return f"{self.table}: update FK {self.constraint_name} on_delete"
343
344 def apply(self) -> str:
345 replace_sql = (
346 f"ALTER TABLE {quote_name(self.table)}"
347 f" DROP CONSTRAINT {quote_name(self.constraint_name)},"
348 f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
349 f" FOREIGN KEY ({quote_name(self.column)})"
350 f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
351 f"{self.on_delete_clause}"
352 f" DEFERRABLE INITIALLY DEFERRED"
353 f" NOT VALID"
354 )
355 _execute_and_commit(replace_sql)
356
357 validate_sql = (
358 f"ALTER TABLE {quote_name(self.table)}"
359 f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
360 )
361 _execute_and_commit(validate_sql, blocking=False)
362
363 return f"{replace_sql}; {validate_sql}"
364
365
366@dataclass
367class SetNotNullFix(Fix):
368 """Enforce NOT NULL via CHECK NOT VALID → VALIDATE → SET NOT NULL.
369
370 A bare SET NOT NULL acquires ACCESS EXCLUSIVE and scans the whole table
371 while holding it. With a validated IS NOT NULL check constraint already
372 in place, Postgres (12+) skips the scan and the lock is brief.
373
374 Transaction boundaries are chosen to keep lock windows narrow:
375
376 1. ADD CHECK (col IS NOT NULL) NOT VALID — catalog-only, brief lock
377 2. VALIDATE CONSTRAINT — SHARE UPDATE EXCLUSIVE scan
378 3. SET NOT NULL + DROP temp check — atomic, instant catalog ops
379
380 Steps 3+4 share a single commit so no orphaned temp check can remain
381 after the column becomes NOT NULL. If earlier steps fail, the leftover
382 temp check is cleaned up at the start of the next run (analysis also
383 ignores framework-owned temp checks so they never block sync).
384 """
385
386 pass_order = 2
387
388 table: str
389 column: str
390
391 def describe(self) -> str:
392 return f"{self.table}: set NOT NULL on {self.column}"
393
394 def apply(self) -> str:
395 from .analysis import generate_notnull_check_name
396
397 t = quote_name(self.table)
398 c = quote_name(self.column)
399 check = quote_name(generate_notnull_check_name(self.table, self.column))
400
401 # Clean up any leftover temp constraint from a previous failed run
402 _execute_and_commit(f"ALTER TABLE {t} DROP CONSTRAINT IF EXISTS {check}")
403
404 # Step 1: Add NOT VALID check (no scan, brief lock)
405 add_sql = (
406 f"ALTER TABLE {t} ADD CONSTRAINT {check} CHECK ({c} IS NOT NULL) NOT VALID"
407 )
408 _execute_and_commit(add_sql)
409
410 # Step 2: Validate (SHARE UPDATE EXCLUSIVE — non-blocking scan)
411 validate_sql = f"ALTER TABLE {t} VALIDATE CONSTRAINT {check}"
412 _execute_and_commit(validate_sql, blocking=False)
413
414 # Step 3: SET NOT NULL + drop temp check in one commit.
415 # Both are instant catalog operations (SET NOT NULL skips the scan
416 # thanks to the validated check). Combining them ensures no orphaned
417 # temp check if SET NOT NULL succeeds.
418 set_sql = f"ALTER TABLE {t} ALTER COLUMN {c} SET NOT NULL"
419 drop_sql = f"ALTER TABLE {t} DROP CONSTRAINT {check}"
420 _execute_and_commit([set_sql, drop_sql])
421
422 return f"{add_sql}; {validate_sql}; {set_sql}; {drop_sql}"
423
424
425@dataclass
426class DropNotNullFix(Fix):
427 """Remove NOT NULL from a column (model now allows NULL).
428
429 DROP NOT NULL is a catalog-only change — no data scan, instant.
430 """
431
432 pass_order = 2
433
434 table: str
435 column: str
436
437 def describe(self) -> str:
438 return f"{self.table}: drop NOT NULL on {self.column}"
439
440 def apply(self) -> str:
441 sql = f"ALTER TABLE {quote_name(self.table)} ALTER COLUMN {quote_name(self.column)} DROP NOT NULL"
442 _execute_and_commit(sql)
443 return sql
444
445
446@dataclass
447class SetColumnDefaultFix(Fix):
448 """Set (or replace) a column's DEFAULT (catalog-only, instant)."""
449
450 pass_order = 2
451
452 table: str
453 column: str
454 default_sql: str
455
456 def describe(self) -> str:
457 return f"{self.table}: set DEFAULT {self.default_sql} on {self.column}"
458
459 def apply(self) -> str:
460 sql = (
461 f"ALTER TABLE {quote_name(self.table)}"
462 f" ALTER COLUMN {quote_name(self.column)}"
463 f" SET DEFAULT {self.default_sql}"
464 )
465 _execute_and_commit(sql)
466 return sql
467
468
469@dataclass
470class DropColumnDefaultFix(Fix):
471 """Drop a column's DEFAULT (catalog-only, instant)."""
472
473 pass_order = 2
474
475 table: str
476 column: str
477
478 def describe(self) -> str:
479 return f"{self.table}: drop DEFAULT on {self.column}"
480
481 def apply(self) -> str:
482 sql = (
483 f"ALTER TABLE {quote_name(self.table)}"
484 f" ALTER COLUMN {quote_name(self.column)}"
485 f" DROP DEFAULT"
486 )
487 _execute_and_commit(sql)
488 return sql
489
490
491@dataclass
492class RenameConstraintFix(Fix):
493 """Rename a constraint (catalog-only, instant).
494
495 For unique constraints, Postgres automatically renames the backing index.
496 """
497
498 pass_order = 2
499
500 table: str
501 old_name: str
502 new_name: str
503
504 def describe(self) -> str:
505 return f"{self.table}: rename constraint {self.old_name} -> {self.new_name}"
506
507 def apply(self) -> str:
508 sql = f"ALTER TABLE {quote_name(self.table)} RENAME CONSTRAINT {quote_name(self.old_name)} TO {quote_name(self.new_name)}"
509 _execute_and_commit(sql)
510 return sql
511
512
513@dataclass
514class ValidateConstraintFix(Fix):
515 """Validate a NOT VALID constraint (SHARE UPDATE EXCLUSIVE — doesn't block writes)."""
516
517 pass_order = 3
518
519 table: str
520 name: str
521
522 def describe(self) -> str:
523 return f"{self.table}: validate constraint {self.name}"
524
525 def apply(self) -> str:
526 sql = f"ALTER TABLE {quote_name(self.table)} VALIDATE CONSTRAINT {quote_name(self.name)}"
527 _execute_and_commit(sql, blocking=False)
528 return sql
529
530
531@dataclass
532class DropConstraintFix(Fix):
533 pass_order = 4
534
535 table: str
536 name: str
537
538 def describe(self) -> str:
539 return f"{self.table}: drop constraint {self.name}"
540
541 def apply(self) -> str:
542 sql = f"ALTER TABLE {quote_name(self.table)} DROP CONSTRAINT {quote_name(self.name)}"
543 _execute_and_commit(sql)
544 return sql
545
546
547@dataclass
548class DropIndexFix(Fix):
549 pass_order = 5
550
551 table: str
552 name: str
553
554 def describe(self) -> str:
555 return f"{self.table}: drop index {self.name}"
556
557 def apply(self) -> str:
558 sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.name)}"
559 _execute_autocommit(sql)
560 return sql
561
562
563@dataclass
564class SetStorageParameterFix(Fix):
565 """Set a single `pg_class.reloptions` parameter (catalog-only, instant)."""
566
567 pass_order = 2
568
569 table: str
570 key: str
571 value: str
572
573 def describe(self) -> str:
574 return f"{self.table}: set storage parameter {self.key} = {self.value}"
575
576 def apply(self) -> str:
577 conn = get_connection()
578 quoted = psycopg.sql.quote(self.value, conn.connection)
579 sql = f"ALTER TABLE {quote_name(self.table)} SET ({self.key} = {quoted})"
580 _execute_and_commit(sql)
581 return sql
582
583
584@dataclass
585class ResetStorageParameterFix(Fix):
586 """Reset a single `pg_class.reloptions` parameter (catalog-only, instant)."""
587
588 pass_order = 2
589
590 table: str
591 key: str
592
593 def describe(self) -> str:
594 return f"{self.table}: reset storage parameter {self.key}"
595
596 def apply(self) -> str:
597 sql = f"ALTER TABLE {quote_name(self.table)} RESET ({self.key})"
598 _execute_and_commit(sql)
599 return sql