1from __future__ import annotations
2
3from abc import ABC, abstractmethod
4from dataclasses import dataclass
5from typing import TYPE_CHECKING, ClassVar
6
7from ..constraints import BaseConstraint, CheckConstraint, UniqueConstraint
8from ..db import get_connection
9from ..dialect import quote_name
10from ..indexes import Index
11
12if TYPE_CHECKING:
13 from ..base import Model
14
15
16def _execute_and_commit(sql: str) -> None:
17 """Execute SQL and commit. Rolls back on failure so the connection stays usable."""
18 conn = get_connection()
19 try:
20 with conn.cursor() as cursor:
21 cursor.execute(sql)
22 conn.commit()
23 except Exception:
24 conn.rollback()
25 raise
26
27
28def _execute_autocommit(sql: str) -> None:
29 """Execute SQL in autocommit mode (required for CONCURRENTLY operations).
30
31 Commits any pending transaction first, since Postgres doesn't allow
32 switching to autocommit while a transaction is active.
33 """
34 conn = get_connection()
35 if conn.in_atomic_block:
36 raise RuntimeError("Cannot use CONCURRENTLY inside an atomic block")
37 old_autocommit = conn.get_autocommit()
38 if not old_autocommit:
39 conn.commit()
40 conn.set_autocommit(True)
41 try:
42 with conn.cursor() as cursor:
43 cursor.execute(sql)
44 finally:
45 if not old_autocommit:
46 conn.set_autocommit(False)
47
48
49class Fix(ABC):
50 """Concrete executable SQL operation for convergence."""
51
52 pass_order: ClassVar[int]
53
54 @abstractmethod
55 def describe(self) -> str: ...
56
57 @abstractmethod
58 def apply(self) -> str: ...
59
60
61@dataclass
62class RebuildIndexFix(Fix):
63 """Drop an INVALID index and recreate it CONCURRENTLY."""
64
65 pass_order = 0
66
67 table: str
68 index: Index
69 model: type[Model]
70
71 def describe(self) -> str:
72 return f"{self.table}: rebuild index {self.index.name}"
73
74 def apply(self) -> str:
75 drop_sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.index.name)}"
76 _execute_autocommit(drop_sql)
77 create_sql = self.index.to_sql(self.model)
78 _execute_autocommit(create_sql)
79 return f"{drop_sql}; {create_sql}"
80
81
82@dataclass
83class RenameIndexFix(Fix):
84 """Rename an index (catalog-only, instant)."""
85
86 pass_order = 1
87
88 table: str
89 old_name: str
90 new_name: str
91
92 def describe(self) -> str:
93 return f"{self.table}: rename index {self.old_name} -> {self.new_name}"
94
95 def apply(self) -> str:
96 sql = f"ALTER INDEX {quote_name(self.old_name)} RENAME TO {quote_name(self.new_name)}"
97 _execute_and_commit(sql)
98 return sql
99
100
101@dataclass
102class CreateIndexFix(Fix):
103 """Create a missing index using CONCURRENTLY (doesn't block writes)."""
104
105 pass_order = 1
106
107 table: str
108 index: Index
109 model: type[Model]
110
111 def describe(self) -> str:
112 return f"{self.table}: create index {self.index.name}"
113
114 def apply(self) -> str:
115 sql = self.index.to_sql(self.model)
116 _execute_autocommit(sql)
117 return sql
118
119
120@dataclass
121class AddConstraintFix(Fix):
122 """Add a missing constraint.
123
124 Check constraints use NOT VALID to avoid a table scan.
125 Unique constraints use CREATE UNIQUE INDEX CONCURRENTLY + USING INDEX
126 to avoid blocking writes.
127 """
128
129 pass_order = 2
130
131 table: str
132 constraint: BaseConstraint
133 model: type[Model]
134
135 def describe(self) -> str:
136 if isinstance(self.constraint, CheckConstraint):
137 return f"{self.table}: add constraint {self.constraint.name} (NOT VALID)"
138 return f"{self.table}: add constraint {self.constraint.name}"
139
140 def apply(self) -> str:
141 if isinstance(self.constraint, UniqueConstraint):
142 return self._apply_unique()
143 return self._apply_other()
144
145 def _apply_unique(self) -> str:
146 assert isinstance(self.constraint, UniqueConstraint)
147
148 # Step 1: Create unique index concurrently (non-blocking)
149 create_idx = self.constraint.to_sql(self.model, concurrently=True)
150 _execute_autocommit(create_idx)
151
152 # Step 2: Attach as constraint — but only for variants PostgreSQL
153 # accepts. Partial indexes, expression indexes, and non-default
154 # operator class indexes cannot be attached as constraints; they
155 # remain as unique indexes (same enforcement, no pg_constraint row).
156 if self.constraint.index_only:
157 return create_idx
158
159 add_constraint = self.constraint.to_attach_sql(self.model)
160 try:
161 _execute_and_commit(add_constraint)
162 except Exception:
163 # Clean up the orphaned index if the constraint attachment fails
164 name = quote_name(self.constraint.name)
165 _execute_autocommit(f"DROP INDEX CONCURRENTLY IF EXISTS {name}")
166 raise
167
168 return f"{create_idx}; {add_constraint}"
169
170 def _apply_other(self) -> str:
171 if isinstance(self.constraint, CheckConstraint):
172 sql = self.constraint.to_sql(self.model, not_valid=True)
173 else:
174 sql = self.constraint.to_sql(self.model)
175 _execute_and_commit(sql)
176 return sql
177
178
179@dataclass
180class AddForeignKeyFix(Fix):
181 """Add a missing FK constraint using NOT VALID, then validate immediately.
182
183 Step 1: ADD CONSTRAINT ... NOT VALID (SHARE ROW EXCLUSIVE, no scan)
184 Step 2: VALIDATE CONSTRAINT (SHARE UPDATE EXCLUSIVE, scans data)
185
186 Both steps run in a single apply() because the validate lock is weaker
187 than the add lock — there's no benefit to deferring validation.
188 """
189
190 pass_order = 2
191
192 table: str
193 constraint_name: str
194 column: str
195 target_table: str
196 target_column: str
197
198 def describe(self) -> str:
199 return f"{self.table}: add FK {self.constraint_name} ({self.column} → {self.target_table}.{self.target_column})"
200
201 def apply(self) -> str:
202 add_sql = (
203 f"ALTER TABLE {quote_name(self.table)}"
204 f" ADD CONSTRAINT {quote_name(self.constraint_name)}"
205 f" FOREIGN KEY ({quote_name(self.column)})"
206 f" REFERENCES {quote_name(self.target_table)} ({quote_name(self.target_column)})"
207 f" DEFERRABLE INITIALLY DEFERRED"
208 f" NOT VALID"
209 )
210 _execute_and_commit(add_sql)
211
212 validate_sql = (
213 f"ALTER TABLE {quote_name(self.table)}"
214 f" VALIDATE CONSTRAINT {quote_name(self.constraint_name)}"
215 )
216 _execute_and_commit(validate_sql)
217
218 return f"{add_sql}; {validate_sql}"
219
220
221@dataclass
222class SetNotNullFix(Fix):
223 """Enforce NOT NULL via CHECK NOT VALID → VALIDATE → SET NOT NULL.
224
225 A bare SET NOT NULL acquires ACCESS EXCLUSIVE and scans the whole table
226 while holding it. With a validated IS NOT NULL check constraint already
227 in place, Postgres (12+) skips the scan and the lock is brief.
228
229 Transaction boundaries are chosen to keep lock windows narrow:
230
231 1. ADD CHECK (col IS NOT NULL) NOT VALID — catalog-only, brief lock
232 2. VALIDATE CONSTRAINT — SHARE UPDATE EXCLUSIVE scan
233 3. SET NOT NULL + DROP temp check — atomic, instant catalog ops
234
235 Steps 3+4 share a single commit so no orphaned temp check can remain
236 after the column becomes NOT NULL. If earlier steps fail, the leftover
237 temp check is cleaned up at the start of the next run (analysis also
238 ignores framework-owned temp checks so they never block sync).
239 """
240
241 pass_order = 2
242
243 table: str
244 column: str
245
246 def describe(self) -> str:
247 return f"{self.table}: set NOT NULL on {self.column}"
248
249 def apply(self) -> str:
250 from .analysis import generate_notnull_check_name
251
252 t = quote_name(self.table)
253 c = quote_name(self.column)
254 check = quote_name(generate_notnull_check_name(self.table, self.column))
255
256 # Clean up any leftover temp constraint from a previous failed run
257 _execute_and_commit(f"ALTER TABLE {t} DROP CONSTRAINT IF EXISTS {check}")
258
259 # Step 1: Add NOT VALID check (no scan, brief lock)
260 add_sql = (
261 f"ALTER TABLE {t} ADD CONSTRAINT {check} CHECK ({c} IS NOT NULL) NOT VALID"
262 )
263 _execute_and_commit(add_sql)
264
265 # Step 2: Validate (SHARE UPDATE EXCLUSIVE — non-blocking scan)
266 validate_sql = f"ALTER TABLE {t} VALIDATE CONSTRAINT {check}"
267 _execute_and_commit(validate_sql)
268
269 # Step 3: SET NOT NULL + drop temp check in one commit.
270 # Both are instant catalog operations (SET NOT NULL skips the scan
271 # thanks to the validated check). Combining them ensures no orphaned
272 # temp check if SET NOT NULL succeeds.
273 set_sql = f"ALTER TABLE {t} ALTER COLUMN {c} SET NOT NULL"
274 drop_sql = f"ALTER TABLE {t} DROP CONSTRAINT {check}"
275 conn = get_connection()
276 try:
277 with conn.cursor() as cursor:
278 cursor.execute(set_sql)
279 cursor.execute(drop_sql)
280 conn.commit()
281 except Exception:
282 conn.rollback()
283 raise
284
285 return f"{add_sql}; {validate_sql}; {set_sql}; {drop_sql}"
286
287
288@dataclass
289class DropNotNullFix(Fix):
290 """Remove NOT NULL from a column (model now allows NULL).
291
292 DROP NOT NULL is a catalog-only change — no data scan, instant.
293 """
294
295 pass_order = 2
296
297 table: str
298 column: str
299
300 def describe(self) -> str:
301 return f"{self.table}: drop NOT NULL on {self.column}"
302
303 def apply(self) -> str:
304 sql = f"ALTER TABLE {quote_name(self.table)} ALTER COLUMN {quote_name(self.column)} DROP NOT NULL"
305 _execute_and_commit(sql)
306 return sql
307
308
309@dataclass
310class RenameConstraintFix(Fix):
311 """Rename a constraint (catalog-only, instant).
312
313 For unique constraints, Postgres automatically renames the backing index.
314 """
315
316 pass_order = 2
317
318 table: str
319 old_name: str
320 new_name: str
321
322 def describe(self) -> str:
323 return f"{self.table}: rename constraint {self.old_name} -> {self.new_name}"
324
325 def apply(self) -> str:
326 sql = f"ALTER TABLE {quote_name(self.table)} RENAME CONSTRAINT {quote_name(self.old_name)} TO {quote_name(self.new_name)}"
327 _execute_and_commit(sql)
328 return sql
329
330
331@dataclass
332class ValidateConstraintFix(Fix):
333 """Validate a NOT VALID constraint (SHARE UPDATE EXCLUSIVE — doesn't block writes)."""
334
335 pass_order = 3
336
337 table: str
338 name: str
339
340 def describe(self) -> str:
341 return f"{self.table}: validate constraint {self.name}"
342
343 def apply(self) -> str:
344 sql = f"ALTER TABLE {quote_name(self.table)} VALIDATE CONSTRAINT {quote_name(self.name)}"
345 _execute_and_commit(sql)
346 return sql
347
348
349@dataclass
350class DropConstraintFix(Fix):
351 pass_order = 4
352
353 table: str
354 name: str
355
356 def describe(self) -> str:
357 return f"{self.table}: drop constraint {self.name}"
358
359 def apply(self) -> str:
360 sql = f"ALTER TABLE {quote_name(self.table)} DROP CONSTRAINT {quote_name(self.name)}"
361 _execute_and_commit(sql)
362 return sql
363
364
365@dataclass
366class DropIndexFix(Fix):
367 pass_order = 5
368
369 table: str
370 name: str
371
372 def describe(self) -> str:
373 return f"{self.table}: drop index {self.name}"
374
375 def apply(self) -> str:
376 sql = f"DROP INDEX CONCURRENTLY IF EXISTS {quote_name(self.name)}"
377 _execute_autocommit(sql)
378 return sql