1from __future__ import annotations
2
3import logging
4import operator
5from collections.abc import Callable, Generator
6from copy import deepcopy
7from datetime import datetime
8from typing import TYPE_CHECKING, Any
9
10from psycopg import sql as psycopg_sql
11
12if TYPE_CHECKING:
13 from typing import Self
14
15 from plain.models.sql.compiler import SQLCompiler
16
17from plain.models.constraints import Deferrable
18from plain.models.fields import DbParameters, Field
19from plain.models.fields.related import ForeignKeyField, RelatedField
20from plain.models.fields.reverse_related import ForeignObjectRel, ManyToManyRel
21from plain.models.indexes import Index
22from plain.models.postgres.sql import (
23 DATA_TYPE_CHECK_CONSTRAINTS,
24 DATA_TYPES,
25 DEFERRABLE_SQL,
26 MAX_NAME_LENGTH,
27 quote_name,
28)
29from plain.models.postgres.utils import names_digest, split_identifier, strip_quotes
30from plain.models.sql import Query
31from plain.models.transaction import atomic
32from plain.utils import timezone
33
34if TYPE_CHECKING:
35 from collections.abc import Iterable
36
37 from plain.models.base import Model
38 from plain.models.constraints import BaseConstraint
39 from plain.models.fields import Field
40 from plain.models.fields.related import ForeignKeyField
41 from plain.models.fields.reverse_related import ManyToManyRel
42 from plain.models.postgres.wrapper import DatabaseWrapper
43
44logger = logging.getLogger("plain.models.postgres.schema")
45
46
47# ##### DDL Reference classes (for deferred DDL statement manipulation) #####
48
49
50class Table:
51 """Hold a reference to a table."""
52
53 def __init__(self, table: str) -> None:
54 self.table = table
55
56 def references_table(self, table: str) -> bool:
57 return self.table == table
58
59 def references_column(self, table: str, column: str) -> bool:
60 return False
61
62 def rename_table_references(self, old_table: str, new_table: str) -> None:
63 if self.table == old_table:
64 self.table = new_table
65
66 def rename_column_references(
67 self, table: str, old_column: str, new_column: str
68 ) -> None:
69 pass
70
71 def __repr__(self) -> str:
72 return f"<{self.__class__.__name__} {str(self)!r}>"
73
74 def __str__(self) -> str:
75 return quote_name(self.table)
76
77
78class TableColumns(Table):
79 """Base class for references to multiple columns of a table."""
80
81 def __init__(self, table: str, columns: list[str]) -> None:
82 self.table = table
83 self.columns = columns
84
85 def references_column(self, table: str, column: str) -> bool:
86 return self.table == table and column in self.columns
87
88 def rename_column_references(
89 self, table: str, old_column: str, new_column: str
90 ) -> None:
91 if self.table == table:
92 for index, column in enumerate(self.columns):
93 if column == old_column:
94 self.columns[index] = new_column
95
96
97class Columns(TableColumns):
98 """Hold a reference to one or many columns."""
99
100 def __init__(
101 self,
102 table: str,
103 columns: list[str],
104 col_suffixes: tuple[str, ...] = (),
105 opclasses: tuple[str, ...] = (),
106 ) -> None:
107 self.col_suffixes = col_suffixes
108 self.opclasses = opclasses
109 super().__init__(table, columns)
110
111 def __str__(self) -> str:
112 def col_str(column: str, idx: int) -> str:
113 col = quote_name(column)
114 # If opclasses are provided, include them
115 if self.opclasses:
116 col = f"{col} {self.opclasses[idx]}"
117 try:
118 suffix = self.col_suffixes[idx]
119 if suffix:
120 col = f"{col} {suffix}"
121 except IndexError:
122 pass
123 return col
124
125 return ", ".join(
126 col_str(column, idx) for idx, column in enumerate(self.columns)
127 )
128
129
130class IndexName(TableColumns):
131 """Hold a reference to an index name."""
132
133 def __init__(
134 self,
135 table: str,
136 columns: list[str],
137 suffix: str,
138 create_index_name: Callable[[str, list[str], str], str],
139 ) -> None:
140 self.suffix = suffix
141 self.create_index_name = create_index_name
142 super().__init__(table, columns)
143
144 def __str__(self) -> str:
145 return self.create_index_name(self.table, self.columns, self.suffix)
146
147
148class ForeignKeyName(TableColumns):
149 """Hold a reference to a foreign key name."""
150
151 def __init__(
152 self,
153 from_table: str,
154 from_columns: list[str],
155 to_table: str,
156 to_columns: list[str],
157 suffix_template: str,
158 create_fk_name: Callable[[str, list[str], str], str],
159 ) -> None:
160 self.to_reference = TableColumns(to_table, to_columns)
161 self.suffix_template = suffix_template
162 self.create_fk_name = create_fk_name
163 super().__init__(
164 from_table,
165 from_columns,
166 )
167
168 def references_table(self, table: str) -> bool:
169 return super().references_table(table) or self.to_reference.references_table(
170 table
171 )
172
173 def references_column(self, table: str, column: str) -> bool:
174 return super().references_column(
175 table, column
176 ) or self.to_reference.references_column(table, column)
177
178 def rename_table_references(self, old_table: str, new_table: str) -> None:
179 super().rename_table_references(old_table, new_table)
180 self.to_reference.rename_table_references(old_table, new_table)
181
182 def rename_column_references(
183 self, table: str, old_column: str, new_column: str
184 ) -> None:
185 super().rename_column_references(table, old_column, new_column)
186 self.to_reference.rename_column_references(table, old_column, new_column)
187
188 def __str__(self) -> str:
189 suffix = self.suffix_template % {
190 "to_table": self.to_reference.table,
191 "to_column": self.to_reference.columns[0],
192 }
193 return self.create_fk_name(self.table, self.columns, suffix)
194
195
196class Statement:
197 """
198 Statement template and formatting parameters container.
199
200 Allows keeping a reference to a statement without interpolating identifiers
201 that might have to be adjusted if they're referencing a table or column
202 that is removed
203 """
204
205 def __init__(self, template: str, **parts: Any) -> None:
206 self.template = template
207 self.parts = parts
208
209 def references_table(self, table: str) -> bool:
210 return any(
211 hasattr(part, "references_table") and part.references_table(table)
212 for part in self.parts.values()
213 )
214
215 def references_column(self, table: str, column: str) -> bool:
216 return any(
217 hasattr(part, "references_column") and part.references_column(table, column)
218 for part in self.parts.values()
219 )
220
221 def rename_table_references(self, old_table: str, new_table: str) -> None:
222 for part in self.parts.values():
223 if hasattr(part, "rename_table_references"):
224 part.rename_table_references(old_table, new_table)
225
226 def rename_column_references(
227 self, table: str, old_column: str, new_column: str
228 ) -> None:
229 for part in self.parts.values():
230 if hasattr(part, "rename_column_references"):
231 part.rename_column_references(table, old_column, new_column)
232
233 def __repr__(self) -> str:
234 return f"<{self.__class__.__name__} {str(self)!r}>"
235
236 def __str__(self) -> str:
237 return self.template % self.parts
238
239
240class Expressions(TableColumns):
241 def __init__(
242 self,
243 table: str,
244 expressions: Any,
245 compiler: SQLCompiler,
246 quote_value: Callable[[Any], str],
247 ) -> None:
248 self.compiler = compiler
249 self.expressions = expressions
250 self.quote_value = quote_value
251 columns = [
252 col.target.column
253 for col in self.compiler.query._gen_cols(iter([self.expressions]))
254 ]
255 super().__init__(table, columns)
256
257 def rename_table_references(self, old_table: str, new_table: str) -> None:
258 if self.table != old_table:
259 return
260 self.expressions = self.expressions.relabeled_clone({old_table: new_table})
261 super().rename_table_references(old_table, new_table)
262
263 def rename_column_references(
264 self, table: str, old_column: str, new_column: str
265 ) -> None:
266 if self.table != table:
267 return
268 expressions = deepcopy(self.expressions)
269 self.columns = []
270 for col in self.compiler.query._gen_cols(iter([expressions])):
271 if col.target.column == old_column:
272 col.target.column = new_column
273 self.columns.append(col.target.column)
274 self.expressions = expressions
275
276 def __str__(self) -> str:
277 sql, params = self.compiler.compile(self.expressions)
278 params = map(self.quote_value, params)
279 return sql % tuple(params)
280
281
282# ##### Schema helper functions #####
283
284
285def _is_relevant_relation(relation: ForeignObjectRel, altered_field: Field) -> bool:
286 """
287 When altering the given field, must constraints on its model from the given
288 relation be temporarily dropped?
289 """
290 from plain.models.fields.related import ManyToManyField
291
292 field = relation.field
293 if isinstance(field, ManyToManyField):
294 # M2M reverse field
295 return False
296 if altered_field.primary_key:
297 # Foreign key constraint on the primary key, which is being altered.
298 return True
299 # ForeignKeyField always targets 'id'
300 return altered_field.name == "id"
301
302
303def _all_related_fields(model: type[Model]) -> list[ForeignObjectRel]:
304 # Related fields must be returned in a deterministic order.
305 return sorted(
306 model._model_meta._get_fields(
307 forward=False,
308 reverse=True,
309 ),
310 key=operator.attrgetter("name"),
311 )
312
313
314def _related_non_m2m_objects(
315 old_field: Field, new_field: Field
316) -> Generator[tuple[ForeignObjectRel, ForeignObjectRel], None, None]:
317 # Filter out m2m objects from reverse relations.
318 # Return (old_relation, new_relation) tuples.
319 related_fields = zip(
320 (
321 obj
322 for obj in _all_related_fields(old_field.model)
323 if _is_relevant_relation(obj, old_field)
324 ),
325 (
326 obj
327 for obj in _all_related_fields(new_field.model)
328 if _is_relevant_relation(obj, new_field)
329 ),
330 )
331 for old_rel, new_rel in related_fields:
332 yield old_rel, new_rel
333 yield from _related_non_m2m_objects(
334 old_rel.remote_field,
335 new_rel.remote_field,
336 )
337
338
339class DatabaseSchemaEditor:
340 """
341 Responsible for emitting schema-changing statements to PostgreSQL - model
342 creation/removal/alteration, field renaming, index management, and so on.
343 """
344
345 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
346 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
347 sql_delete_table = "DROP TABLE %(table)s CASCADE"
348
349 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
350 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
351 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
352 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
353 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
354 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
355 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
356 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
357 sql_rename_column = (
358 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
359 )
360 # Setting all constraints to IMMEDIATE to allow changing data in the same transaction.
361 sql_update_with_default = (
362 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
363 "; SET CONSTRAINTS ALL IMMEDIATE"
364 )
365
366 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
367 sql_check_constraint = "CHECK (%(check)s)"
368 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
369 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
370
371 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
372 sql_delete_check = sql_delete_constraint
373
374 sql_create_unique = (
375 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
376 "UNIQUE (%(columns)s)%(deferrable)s"
377 )
378 sql_delete_unique = sql_delete_constraint
379
380 sql_create_fk = (
381 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
382 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
383 )
384 # Setting the constraint to IMMEDIATE to allow changing data in the same transaction.
385 sql_create_column_inline_fk = (
386 "CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s"
387 "; SET CONSTRAINTS %(namespace)s%(name)s IMMEDIATE"
388 )
389 # Setting the constraint to IMMEDIATE runs any deferred checks to allow
390 # dropping it in the same transaction.
391 sql_delete_fk = (
392 "SET CONSTRAINTS %(name)s IMMEDIATE; "
393 "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
394 )
395
396 sql_create_index = (
397 "CREATE INDEX %(name)s ON %(table)s%(using)s "
398 "(%(columns)s)%(include)s%(extra)s%(condition)s"
399 )
400 sql_create_index_concurrently = (
401 "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s "
402 "(%(columns)s)%(include)s%(extra)s%(condition)s"
403 )
404 sql_create_unique_index = (
405 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
406 "(%(columns)s)%(include)s%(condition)s"
407 )
408 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
409 sql_delete_index = "DROP INDEX IF EXISTS %(name)s"
410 sql_delete_index_concurrently = "DROP INDEX CONCURRENTLY IF EXISTS %(name)s"
411
412 sql_create_pk = (
413 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
414 )
415 sql_delete_pk = sql_delete_constraint
416
417 # PostgreSQL IDENTITY column support
418 sql_add_identity = (
419 "ALTER TABLE %(table)s ALTER COLUMN %(column)s ADD "
420 "GENERATED BY DEFAULT AS IDENTITY"
421 )
422 sql_drop_identity = (
423 "ALTER TABLE %(table)s ALTER COLUMN %(column)s DROP IDENTITY IF EXISTS"
424 )
425 sql_alter_sequence_type = "ALTER SEQUENCE IF EXISTS %(sequence)s AS %(type)s"
426 sql_delete_sequence = "DROP SEQUENCE IF EXISTS %(sequence)s CASCADE"
427
428 def __init__(
429 self,
430 connection: DatabaseWrapper,
431 atomic: bool = True,
432 collect_sql: bool = False,
433 ):
434 self.connection = connection
435 self.collect_sql = collect_sql
436 self.atomic_migration = atomic and not collect_sql
437
438 # State-managing methods
439
440 def __enter__(self) -> Self:
441 self.deferred_sql: list[Any] = []
442 self.executed_sql: list[str] = []
443 if self.atomic_migration:
444 self.atomic = atomic()
445 self.atomic.__enter__()
446 return self
447
448 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
449 if exc_type is None:
450 for sql in self.deferred_sql:
451 self.execute(sql)
452 if self.atomic_migration:
453 self.atomic.__exit__(exc_type, exc_value, traceback)
454 self.deferred_sql.clear()
455
456 # Core utility functions
457
458 def execute(
459 self, sql: str | Statement, params: tuple[Any, ...] | list[Any] | None = ()
460 ) -> None:
461 """Execute the given SQL statement, with optional parameters."""
462 # Account for non-string statement objects.
463 sql_str = str(sql)
464
465 # Merge the query client-side, as PostgreSQL won't do it server-side.
466 if params is not None:
467 sql_str = self.connection.compose_sql(sql_str, params)
468 params = None
469
470 # Log the command we're running, then run it
471 logger.debug(
472 "%s; (params %r)", sql_str, params, extra={"params": params, "sql": sql_str}
473 )
474
475 # Track executed SQL for display in migration output
476 self.executed_sql.append(sql_str)
477
478 if self.collect_sql:
479 return
480
481 with self.connection.cursor() as cursor:
482 cursor.execute(sql_str, params)
483
484 def quote_value(self, value: Any) -> str:
485 """
486 Return a quoted version of the value so it's safe to use in an SQL
487 string. This is not safe against injection from user code; it is
488 intended only for use in making SQL scripts or preparing default values
489 (which are not user-defined, so this is safe).
490 """
491 if isinstance(value, str):
492 value = value.replace("%", "%%")
493 return psycopg_sql.quote(value, self.connection.connection)
494
495 def table_sql(self, model: type[Model]) -> tuple[str, list[Any]]:
496 """Take a model and return its table definition."""
497 # Create column SQL, add FK deferreds if needed.
498 column_sqls = []
499 params = []
500 for field in model._model_meta.local_fields:
501 # SQL.
502 definition, extra_params = self.column_sql(model, field)
503 if definition is None:
504 continue
505 # Check constraints can go on the column SQL here.
506 db_params = field.db_parameters()
507 if db_params["check"]:
508 definition += " " + self.sql_check_constraint % db_params
509 # Autoincrement SQL (e.g. GENERATED BY DEFAULT AS IDENTITY).
510 col_type_suffix = field.db_type_suffix()
511 if col_type_suffix:
512 definition += f" {col_type_suffix}"
513 if extra_params:
514 params.extend(extra_params)
515 # PostgreSQL creates FK constraints via deferred ALTER TABLE
516 if isinstance(field, ForeignKeyField) and field.db_constraint:
517 self.deferred_sql.append(
518 self._create_fk_sql(model, field, "_fk_%(to_table)s_%(to_column)s")
519 )
520 # Add the SQL to our big list.
521 column_sqls.append(f"{quote_name(field.column)} {definition}")
522 constraints = [
523 constraint.constraint_sql(model, self)
524 for constraint in model.model_options.constraints
525 ]
526 sql = self.sql_create_table % {
527 "table": quote_name(model.model_options.db_table),
528 "definition": ", ".join(
529 str(constraint)
530 for constraint in (*column_sqls, *constraints)
531 if constraint
532 ),
533 }
534 return sql, params
535
536 # Field <-> database mapping functions
537
538 def _iter_column_sql(
539 self,
540 column_db_type: str,
541 params: list[Any],
542 model: type[Model],
543 field: Field,
544 field_db_params: DbParameters,
545 include_default: bool,
546 ) -> Generator[str, None, None]:
547 yield column_db_type
548 # Work out nullability.
549 null = field.allow_null
550 # Include a default value, if requested.
551 if include_default:
552 default_value = self.effective_default(field)
553 if default_value is not None:
554 yield "DEFAULT %s"
555 params.append(default_value)
556
557 if not null:
558 yield "NOT NULL"
559 else:
560 yield "NULL"
561
562 if field.primary_key:
563 yield "PRIMARY KEY"
564
565 def column_sql(
566 self, model: type[Model], field: Field, include_default: bool = False
567 ) -> tuple[str | None, list[Any] | None]:
568 """
569 Return the column definition for a field. The field must already have
570 had set_attributes_from_name() called.
571 """
572 # Get the column's type and use that as the basis of the SQL.
573 field_db_params = field.db_parameters()
574 column_db_type = field_db_params["type"]
575 # Check for fields that aren't actually columns (e.g. M2M).
576 if column_db_type is None:
577 return None, None
578 params: list[Any] = []
579 return (
580 " ".join(
581 # This appends to the params being returned.
582 self._iter_column_sql(
583 column_db_type,
584 params,
585 model,
586 field,
587 field_db_params,
588 include_default,
589 )
590 ),
591 params,
592 )
593
594 @staticmethod
595 def _effective_default(field: Field) -> Any:
596 # This method allows testing its logic without a connection.
597 if field.has_default():
598 default = field.get_default()
599 elif (
600 not field.allow_null and not field.required and field.empty_strings_allowed
601 ):
602 if field.get_internal_type() == "BinaryField":
603 default = b""
604 else:
605 default = ""
606 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
607 internal_type = field.get_internal_type()
608 if internal_type == "DateTimeField":
609 default = timezone.now()
610 else:
611 default = datetime.now()
612 if internal_type == "DateField":
613 default = default.date()
614 elif internal_type == "TimeField":
615 default = default.time()
616 else:
617 default = None
618 return default
619
620 def effective_default(self, field: Field) -> Any:
621 """Return a field's effective database default value."""
622 return field.get_db_prep_save(self._effective_default(field), self.connection)
623
624 # Actions
625
626 def create_model(self, model: type[Model]) -> None:
627 """
628 Create a table and any accompanying indexes or unique constraints for
629 the given `model`.
630 """
631 sql, params = self.table_sql(model)
632 # Prevent using [] as params, in the case a literal '%' is used in the
633 # definition.
634 self.execute(sql, params or None)
635
636 # Add any field indexes.
637 self.deferred_sql.extend(self._model_indexes_sql(model))
638
639 def delete_model(self, model: type[Model]) -> None:
640 """Delete a model from the database."""
641
642 # Delete the table
643 self.execute(
644 self.sql_delete_table
645 % {
646 "table": quote_name(model.model_options.db_table),
647 }
648 )
649 # Remove all deferred statements referencing the deleted table.
650 for sql in list(self.deferred_sql):
651 if isinstance(sql, Statement) and sql.references_table(
652 model.model_options.db_table
653 ):
654 self.deferred_sql.remove(sql)
655
656 def add_index(
657 self, model: type[Model], index: Index, concurrently: bool = False
658 ) -> None:
659 """Add an index on a model."""
660 self.execute(
661 index.create_sql(model, self, concurrently=concurrently), params=None
662 )
663
664 def remove_index(
665 self, model: type[Model], index: Index, concurrently: bool = False
666 ) -> None:
667 """Remove an index from a model."""
668 self.execute(index.remove_sql(model, self, concurrently=concurrently))
669
670 def rename_index(
671 self, model: type[Model], old_index: Index, new_index: Index
672 ) -> None:
673 self.execute(
674 self._rename_index_sql(model, old_index.name, new_index.name),
675 params=None,
676 )
677
678 def add_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
679 """Add a constraint to a model."""
680 sql = constraint.create_sql(model, self)
681 if sql:
682 # Constraint.create_sql returns interpolated SQL which makes
683 # params=None a necessity to avoid escaping attempts on execution.
684 self.execute(sql, params=None)
685
686 def remove_constraint(self, model: type[Model], constraint: BaseConstraint) -> None:
687 """Remove a constraint from a model."""
688 sql = constraint.remove_sql(model, self)
689 if sql:
690 self.execute(sql)
691
692 def alter_db_table(
693 self, model: type[Model], old_db_table: str, new_db_table: str
694 ) -> None:
695 """Rename the table a model points to."""
696 if old_db_table == new_db_table:
697 return
698 self.execute(
699 self.sql_rename_table
700 % {
701 "old_table": quote_name(old_db_table),
702 "new_table": quote_name(new_db_table),
703 }
704 )
705 # Rename all references to the old table name.
706 for sql in self.deferred_sql:
707 if isinstance(sql, Statement):
708 sql.rename_table_references(old_db_table, new_db_table)
709
710 def add_field(self, model: type[Model], field: Field) -> None:
711 """
712 Create a field on a model. Usually involves adding a column, but may
713 involve adding a table instead (for M2M fields).
714 """
715 # Get the column's definition
716 definition, params = self.column_sql(model, field, include_default=True)
717 # It might not actually have a column behind it
718 if definition is None:
719 return
720 if col_type_suffix := field.db_type_suffix():
721 definition += f" {col_type_suffix}"
722 # Check constraints can go on the column SQL here
723 db_params = field.db_parameters()
724 if db_params["check"]:
725 definition += " " + self.sql_check_constraint % db_params
726 if isinstance(field, ForeignKeyField) and field.db_constraint:
727 # Add FK constraint inline (PostgreSQL always supports this).
728 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
729 to_table = field.remote_field.model.model_options.db_table
730 field_name = field.remote_field.field_name
731 if field_name is None:
732 raise ValueError("Foreign key field_name cannot be None")
733 to_field = field.remote_field.model._model_meta.get_forward_field(
734 field_name
735 )
736 to_column = to_field.column
737 namespace, _ = split_identifier(model.model_options.db_table)
738 definition += " " + self.sql_create_column_inline_fk % {
739 "name": self._fk_constraint_name(model, field, constraint_suffix),
740 "namespace": f"{quote_name(namespace)}." if namespace else "",
741 "column": quote_name(field.column),
742 "to_table": quote_name(to_table),
743 "to_column": quote_name(to_column),
744 "deferrable": DEFERRABLE_SQL,
745 }
746 # Build the SQL and run it
747 sql = self.sql_create_column % {
748 "table": quote_name(model.model_options.db_table),
749 "column": quote_name(field.column),
750 "definition": definition,
751 }
752 self.execute(sql, params)
753 # Drop the default if we need to
754 # (Plain usually does not use in-database defaults)
755 if self.effective_default(field) is not None:
756 changes_sql, params = self._alter_column_default_sql(
757 model, None, field, drop=True
758 )
759 sql = self.sql_alter_column % {
760 "table": quote_name(model.model_options.db_table),
761 "changes": changes_sql,
762 }
763 self.execute(sql, params)
764 # Add an index, if required
765 self.deferred_sql.extend(self._field_indexes_sql(model, field))
766
767 def remove_field(self, model: type[Model], field: Field) -> None:
768 """
769 Remove a field from a model. Usually involves deleting a column,
770 but for M2Ms may involve deleting a table.
771 """
772 # It might not actually have a column behind it
773 if field.db_parameters()["type"] is None:
774 return
775 # Drop any FK constraints
776 if isinstance(field, RelatedField):
777 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
778 for fk_name in fk_names:
779 self.execute(
780 self._delete_constraint_sql(self.sql_delete_fk, model, fk_name)
781 )
782 # Delete the column
783 sql = self.sql_delete_column % {
784 "table": quote_name(model.model_options.db_table),
785 "column": quote_name(field.column),
786 }
787 self.execute(sql)
788 # Remove all deferred statements referencing the deleted column.
789 for sql in list(self.deferred_sql):
790 if isinstance(sql, Statement) and sql.references_column(
791 model.model_options.db_table, field.column
792 ):
793 self.deferred_sql.remove(sql)
794
795 def alter_field(
796 self,
797 model: type[Model],
798 old_field: Field,
799 new_field: Field,
800 strict: bool = False,
801 ) -> None:
802 """
803 Allow a field's type, uniqueness, nullability, default, column,
804 constraints, etc. to be modified.
805 `old_field` is required to compute the necessary changes.
806 If `strict` is True, raise errors if the old column does not match
807 `old_field` precisely.
808 """
809 if not self._field_should_be_altered(old_field, new_field):
810 return
811 # Ensure this field is even column-based
812 old_db_params = old_field.db_parameters()
813 old_type = old_db_params["type"]
814 new_db_params = new_field.db_parameters()
815 new_type = new_db_params["type"]
816 if (old_type is None and not isinstance(old_field, RelatedField)) or (
817 new_type is None and not isinstance(new_field, RelatedField)
818 ):
819 raise ValueError(
820 f"Cannot alter field {old_field} into {new_field} - they do not properly define "
821 "db_type (are you using a badly-written custom field?)",
822 )
823 elif (
824 old_type is None
825 and new_type is None
826 and isinstance(old_field, RelatedField)
827 and isinstance(old_field.remote_field, ManyToManyRel)
828 and isinstance(new_field, RelatedField)
829 and isinstance(new_field.remote_field, ManyToManyRel)
830 ):
831 # Both sides have through models; this is a no-op.
832 return
833 elif old_type is None or new_type is None:
834 raise ValueError(
835 f"Cannot alter field {old_field} into {new_field} - they are not compatible types "
836 "(you cannot alter to or from M2M fields, or add or remove "
837 "through= on M2M fields)"
838 )
839
840 self._alter_field(
841 model,
842 old_field,
843 new_field,
844 old_type,
845 new_type,
846 old_db_params,
847 new_db_params,
848 strict,
849 )
850
851 def _field_db_check(
852 self, field: Field, field_db_params: DbParameters
853 ) -> str | None:
854 # Always check constraints with the same mocked column name to avoid
855 # recreating constrains when the column is renamed.
856 data = field.db_type_parameters()
857 data["column"] = "__column_name__"
858 try:
859 return DATA_TYPE_CHECK_CONSTRAINTS[field.get_internal_type()] % data
860 except KeyError:
861 return None
862
863 def _field_data_type(
864 self, field: Field
865 ) -> str | None | Callable[[dict[str, Any]], str]:
866 if isinstance(field, RelatedField):
867 return field.rel_db_type()
868 return DATA_TYPES.get(
869 field.get_internal_type(),
870 field.db_type(),
871 )
872
873 def _get_sequence_name(self, table: str, column: str) -> str | None:
874 with self.connection.cursor() as cursor:
875 for sequence in self.connection.get_sequences(cursor, table):
876 if sequence["column"] == column:
877 return sequence["name"]
878 return None
879
880 def _alter_field(
881 self,
882 model: type[Model],
883 old_field: Field,
884 new_field: Field,
885 old_type: str,
886 new_type: str,
887 old_db_params: DbParameters,
888 new_db_params: DbParameters,
889 strict: bool = False,
890 ) -> None:
891 """Perform a "physical" (non-ManyToMany) field update."""
892 # Drop any FK constraints, we'll remake them later
893 fks_dropped = set()
894 if (
895 isinstance(old_field, ForeignKeyField)
896 and old_field.db_constraint
897 and self._field_should_be_altered(
898 old_field,
899 new_field,
900 )
901 ):
902 fk_names = self._constraint_names(
903 model, [old_field.column], foreign_key=True
904 )
905 if strict and len(fk_names) != 1:
906 raise ValueError(
907 f"Found wrong number ({len(fk_names)}) of foreign key constraints for {model.model_options.db_table}.{old_field.column}"
908 )
909 for fk_name in fk_names:
910 fks_dropped.add((old_field.column,))
911 self.execute(
912 self._delete_constraint_sql(self.sql_delete_fk, model, fk_name)
913 )
914 # Has unique been removed?
915 if old_field.primary_key and (
916 not new_field.primary_key
917 or self._field_became_primary_key(old_field, new_field)
918 ):
919 # Find the unique constraint for this field
920 meta_constraint_names = {
921 constraint.name for constraint in model.model_options.constraints
922 }
923 constraint_names = self._constraint_names(
924 model,
925 [old_field.column],
926 unique=True,
927 primary_key=False,
928 exclude=meta_constraint_names,
929 )
930 if strict and len(constraint_names) != 1:
931 raise ValueError(
932 f"Found wrong number ({len(constraint_names)}) of unique constraints for {model.model_options.db_table}.{old_field.column}"
933 )
934 for constraint_name in constraint_names:
935 sql = self._delete_unique_sql(model, constraint_name)
936 if sql is not None:
937 self.execute(sql)
938 # Drop incoming FK constraints if the field is a primary key or unique,
939 # which might be a to_field target, and things are going to change.
940 drop_foreign_keys = (old_field.primary_key and new_field.primary_key) and (
941 old_type != new_type
942 )
943 if drop_foreign_keys:
944 # '_model_meta.related_field' also contains M2M reverse fields, these
945 # will be filtered out
946 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
947 rel_fk_names = self._constraint_names(
948 new_rel.related_model, [new_rel.field.column], foreign_key=True
949 )
950 for fk_name in rel_fk_names:
951 self.execute(
952 self._delete_constraint_sql(
953 self.sql_delete_fk, new_rel.related_model, fk_name
954 )
955 )
956 # Removed an index? (no strict check, as multiple indexes are possible)
957 # Remove indexes if db_index switched to False or a unique constraint
958 # will now be used in lieu of an index. The following lines from the
959 # truth table show all True cases; the rest are False:
960 #
961 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
962 # ------------------------------------------------------------------------------
963 # True | False | False | False
964 # True | False | False | True
965 # True | False | True | True
966 if (
967 isinstance(old_field, ForeignKeyField)
968 and old_field.db_index
969 and not old_field.primary_key
970 and (
971 not (isinstance(new_field, ForeignKeyField) and new_field.db_index)
972 or new_field.primary_key
973 )
974 ):
975 # Find the index for this field
976 meta_index_names = {index.name for index in model.model_options.indexes}
977 # Retrieve only BTREE indexes since this is what's created with
978 # db_index=True.
979 index_names = self._constraint_names(
980 model,
981 [old_field.column],
982 index=True,
983 type_=Index.suffix,
984 exclude=meta_index_names,
985 )
986 for index_name in index_names:
987 # The only way to check if an index was created with
988 # db_index=True or with Index(['field'], name='foo')
989 # is to look at its name (refs #28053).
990 self.execute(self._delete_index_sql(model, index_name))
991 # Change check constraints?
992 old_db_check = self._field_db_check(old_field, old_db_params)
993 new_db_check = self._field_db_check(new_field, new_db_params)
994 if old_db_check != new_db_check and old_db_check:
995 meta_constraint_names = {
996 constraint.name for constraint in model.model_options.constraints
997 }
998 constraint_names = self._constraint_names(
999 model,
1000 [old_field.column],
1001 check=True,
1002 exclude=meta_constraint_names,
1003 )
1004 if strict and len(constraint_names) != 1:
1005 raise ValueError(
1006 f"Found wrong number ({len(constraint_names)}) of check constraints for {model.model_options.db_table}.{old_field.column}"
1007 )
1008 for constraint_name in constraint_names:
1009 sql = self._delete_constraint_sql(
1010 self.sql_delete_check, model, constraint_name
1011 )
1012 if sql is not None:
1013 self.execute(sql)
1014 # Have they renamed the column?
1015 if old_field.column != new_field.column:
1016 self.execute(
1017 self._rename_field_sql(
1018 model.model_options.db_table, old_field, new_field, new_type
1019 )
1020 )
1021 # Rename all references to the renamed column.
1022 for sql in self.deferred_sql:
1023 if isinstance(sql, Statement):
1024 sql.rename_column_references(
1025 model.model_options.db_table, old_field.column, new_field.column
1026 )
1027 # Next, start accumulating actions to do
1028 actions = []
1029 null_actions = []
1030 post_actions = []
1031 # Type suffix change? (e.g. auto increment).
1032 old_type_suffix = old_field.db_type_suffix()
1033 new_type_suffix = new_field.db_type_suffix()
1034 # Type change?
1035 if old_type != new_type or old_type_suffix != new_type_suffix:
1036 fragment, other_actions = self._alter_column_type_sql(
1037 model, old_field, new_field, new_type
1038 )
1039 actions.append(fragment)
1040 post_actions.extend(other_actions)
1041 # When changing a column NULL constraint to NOT NULL with a given
1042 # default value, we need to perform 4 steps:
1043 # 1. Add a default for new incoming writes
1044 # 2. Update existing NULL rows with new default
1045 # 3. Replace NULL constraint with NOT NULL
1046 # 4. Drop the default again.
1047 # Default change?
1048 needs_database_default = False
1049 if old_field.allow_null and not new_field.allow_null:
1050 old_default = self.effective_default(old_field)
1051 new_default = self.effective_default(new_field)
1052 if old_default != new_default and new_default is not None:
1053 needs_database_default = True
1054 actions.append(
1055 self._alter_column_default_sql(model, old_field, new_field)
1056 )
1057 # Nullability change?
1058 if old_field.allow_null != new_field.allow_null:
1059 fragment = self._alter_column_null_sql(model, old_field, new_field)
1060 if fragment:
1061 null_actions.append(fragment)
1062 # Only if we have a default and there is a change from NULL to NOT NULL
1063 four_way_default_alteration = new_field.has_default() and (
1064 old_field.allow_null and not new_field.allow_null
1065 )
1066 if actions or null_actions:
1067 if not four_way_default_alteration:
1068 # If we don't have to do a 4-way default alteration we can
1069 # directly run a (NOT) NULL alteration
1070 actions += null_actions
1071 # Combine actions together
1072 if actions:
1073 sql, params = tuple(zip(*actions))
1074 actions = [(", ".join(sql), sum(params, []))]
1075 # Apply those actions
1076 for sql, params in actions:
1077 self.execute(
1078 self.sql_alter_column
1079 % {
1080 "table": quote_name(model.model_options.db_table),
1081 "changes": sql,
1082 },
1083 params,
1084 )
1085 if four_way_default_alteration:
1086 # Update existing rows with default value
1087 self.execute(
1088 self.sql_update_with_default
1089 % {
1090 "table": quote_name(model.model_options.db_table),
1091 "column": quote_name(new_field.column),
1092 "default": "%s",
1093 },
1094 [new_default],
1095 )
1096 # Since we didn't run a NOT NULL change before we need to do it
1097 # now
1098 for sql, params in null_actions:
1099 self.execute(
1100 self.sql_alter_column
1101 % {
1102 "table": quote_name(model.model_options.db_table),
1103 "changes": sql,
1104 },
1105 params,
1106 )
1107 if post_actions:
1108 for sql, params in post_actions:
1109 self.execute(sql, params)
1110 # If primary_key changed to False, delete the primary key constraint.
1111 if old_field.primary_key and not new_field.primary_key:
1112 self._delete_primary_key(model, strict)
1113
1114 # Added an index? Add an index if db_index switched to True or a unique
1115 # constraint will no longer be used in lieu of an index. The following
1116 # lines from the truth table show all True cases; the rest are False:
1117 #
1118 # old_field.db_index | old_field.primary_key | new_field.db_index | new_field.primary_key
1119 # ------------------------------------------------------------------------------
1120 # False | False | True | False
1121 # False | True | True | False
1122 # True | True | True | False
1123 if (
1124 (
1125 not (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1126 or old_field.primary_key
1127 )
1128 and isinstance(new_field, ForeignKeyField)
1129 and new_field.db_index
1130 and not new_field.primary_key
1131 ):
1132 self.execute(self._create_index_sql(model, fields=[new_field]))
1133 # Type alteration on primary key? Then we need to alter the column
1134 # referring to us.
1135 rels_to_update = []
1136 if drop_foreign_keys:
1137 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1138 # Changed to become primary key?
1139 if self._field_became_primary_key(old_field, new_field):
1140 # Make the new one
1141 self.execute(self._create_primary_key_sql(model, new_field))
1142 # Update all referencing columns
1143 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1144 # Handle our type alters on the other end of rels from the PK stuff above
1145 for old_rel, new_rel in rels_to_update:
1146 rel_db_params = new_rel.field.db_parameters()
1147 rel_type = rel_db_params["type"]
1148 fragment, other_actions = self._alter_column_type_sql(
1149 new_rel.related_model,
1150 old_rel.field,
1151 new_rel.field,
1152 rel_type,
1153 )
1154 self.execute(
1155 self.sql_alter_column
1156 % {
1157 "table": quote_name(new_rel.related_model.model_options.db_table),
1158 "changes": fragment[0],
1159 },
1160 fragment[1],
1161 )
1162 for sql, params in other_actions:
1163 self.execute(sql, params)
1164 # Does it have a foreign key?
1165 if (
1166 isinstance(new_field, ForeignKeyField)
1167 and (
1168 fks_dropped
1169 or not isinstance(old_field, ForeignKeyField)
1170 or not old_field.db_constraint
1171 )
1172 and new_field.db_constraint
1173 ):
1174 self.execute(
1175 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1176 )
1177 # Rebuild FKs that pointed to us if we previously had to drop them
1178 if drop_foreign_keys:
1179 for _, rel in rels_to_update:
1180 if isinstance(rel.field, ForeignKeyField) and rel.field.db_constraint:
1181 self.execute(
1182 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1183 )
1184 # Does it have check constraints we need to add?
1185 if old_db_check != new_db_check and new_db_check:
1186 constraint_name = self._create_index_name(
1187 model.model_options.db_table, [new_field.column], suffix="_check"
1188 )
1189 new_check = new_db_params["check"]
1190 assert new_check is not None # Guaranteed by new_db_check check above
1191 sql = self._create_check_sql(model, constraint_name, new_check)
1192 if sql is not None:
1193 self.execute(sql)
1194 # Drop the default if we need to
1195 # (Plain usually does not use in-database defaults)
1196 if needs_database_default:
1197 changes_sql, params = self._alter_column_default_sql(
1198 model, old_field, new_field, drop=True
1199 )
1200 sql = self.sql_alter_column % {
1201 "table": quote_name(model.model_options.db_table),
1202 "changes": changes_sql,
1203 }
1204 self.execute(sql, params)
1205
1206 # Added an index? Create any PostgreSQL-specific indexes.
1207 if (
1208 not (
1209 (isinstance(old_field, ForeignKeyField) and old_field.db_index)
1210 or old_field.primary_key
1211 )
1212 and isinstance(new_field, ForeignKeyField)
1213 and new_field.db_index
1214 ) or (not old_field.primary_key and new_field.primary_key):
1215 like_index_statement = self._create_like_index_sql(model, new_field)
1216 if like_index_statement is not None:
1217 self.execute(like_index_statement)
1218
1219 # Removed an index? Drop any PostgreSQL-specific indexes.
1220 if old_field.primary_key and not (
1221 (isinstance(new_field, ForeignKeyField) and new_field.db_index)
1222 or new_field.primary_key
1223 ):
1224 index_to_remove = self._create_index_name(
1225 model.model_options.db_table, [old_field.column], suffix="_like"
1226 )
1227 self.execute(self._delete_index_sql(model, index_to_remove))
1228
1229 def _alter_column_null_sql(
1230 self, model: type[Model], old_field: Field, new_field: Field
1231 ) -> tuple[str, list[Any]]:
1232 """
1233 Return a (sql, params) fragment to set a column to null or non-null
1234 as required by new_field.
1235 """
1236 new_db_params = new_field.db_parameters()
1237 sql = (
1238 self.sql_alter_column_null
1239 if new_field.allow_null
1240 else self.sql_alter_column_not_null
1241 )
1242 return (
1243 sql
1244 % {
1245 "column": quote_name(new_field.column),
1246 "type": new_db_params["type"],
1247 },
1248 [],
1249 )
1250
1251 def _alter_column_default_sql(
1252 self,
1253 model: type[Model],
1254 old_field: Field | None,
1255 new_field: Field,
1256 drop: bool = False,
1257 ) -> tuple[str, list[Any]]:
1258 """
1259 Return a (sql, params) fragment to add or drop (depending on the drop
1260 argument) a default to new_field's column.
1261 """
1262 new_default = self.effective_default(new_field)
1263 params: list[Any] = [] if drop else [new_default]
1264
1265 new_db_params = new_field.db_parameters()
1266 if drop:
1267 # PostgreSQL uses the same SQL for nullable and non-nullable columns
1268 sql = self.sql_alter_column_no_default
1269 else:
1270 sql = self.sql_alter_column_default
1271 return (
1272 sql
1273 % {
1274 "column": quote_name(new_field.column),
1275 "type": new_db_params["type"],
1276 "default": "%s",
1277 },
1278 params,
1279 )
1280
1281 def _alter_column_type_sql(
1282 self,
1283 model: type[Model],
1284 old_field: Field,
1285 new_field: Field,
1286 new_type: str,
1287 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1288 """
1289 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1290 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1291 run once the field is altered. Handles IDENTITY column transitions.
1292 """
1293 # Drop indexes on varchar/text/citext columns that are changing to a
1294 # different type.
1295 old_db_params = old_field.db_parameters()
1296 old_type = old_db_params["type"]
1297 assert old_type is not None, "old_type cannot be None for primary key field"
1298 if old_field.primary_key and (
1299 (old_type.startswith("varchar") and not new_type.startswith("varchar"))
1300 or (old_type.startswith("text") and not new_type.startswith("text"))
1301 or (old_type.startswith("citext") and not new_type.startswith("citext"))
1302 ):
1303 index_name = self._create_index_name(
1304 model.model_options.db_table, [old_field.column], suffix="_like"
1305 )
1306 self.execute(self._delete_index_sql(model, index_name))
1307
1308 self.sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
1309 # Cast when data type changed.
1310 if self._field_data_type(old_field) != self._field_data_type(new_field):
1311 self.sql_alter_column_type += " USING %(column)s::%(type)s"
1312 new_internal_type = new_field.get_internal_type()
1313 old_internal_type = old_field.get_internal_type()
1314 # Make ALTER TYPE with IDENTITY make sense.
1315 table = strip_quotes(model.model_options.db_table)
1316 auto_field_types = {"PrimaryKeyField"}
1317 old_is_auto = old_internal_type in auto_field_types
1318 new_is_auto = new_internal_type in auto_field_types
1319 if new_is_auto and not old_is_auto:
1320 column = strip_quotes(new_field.column)
1321 return (
1322 (
1323 self.sql_alter_column_type
1324 % {
1325 "column": quote_name(column),
1326 "type": new_type,
1327 },
1328 [],
1329 ),
1330 [
1331 (
1332 self.sql_add_identity
1333 % {
1334 "table": quote_name(table),
1335 "column": quote_name(column),
1336 },
1337 [],
1338 ),
1339 ],
1340 )
1341 elif old_is_auto and not new_is_auto:
1342 # Drop IDENTITY if exists (pre-Plain 4.1 serial columns don't have
1343 # it).
1344 self.execute(
1345 self.sql_drop_identity
1346 % {
1347 "table": quote_name(table),
1348 "column": quote_name(strip_quotes(new_field.column)),
1349 }
1350 )
1351 column = strip_quotes(new_field.column)
1352 fragment, _ = self._alter_column_type_sql_base(
1353 model, old_field, new_field, new_type
1354 )
1355 # Drop the sequence if exists (Plain 4.1+ identity columns don't
1356 # have it).
1357 other_actions: list[tuple[str, list[Any]]] = []
1358 if sequence_name := self._get_sequence_name(table, column):
1359 other_actions = [
1360 (
1361 self.sql_delete_sequence
1362 % {
1363 "sequence": quote_name(sequence_name),
1364 },
1365 [],
1366 )
1367 ]
1368 return fragment, other_actions
1369 elif new_is_auto and old_is_auto and old_internal_type != new_internal_type:
1370 fragment, _ = self._alter_column_type_sql_base(
1371 model, old_field, new_field, new_type
1372 )
1373 column = strip_quotes(new_field.column)
1374 db_types = {"PrimaryKeyField": "bigint"}
1375 # Alter the sequence type if exists (Plain 4.1+ identity columns
1376 # don't have it).
1377 other_actions: list[tuple[str, list[Any]]] = []
1378 if sequence_name := self._get_sequence_name(table, column):
1379 other_actions = [
1380 (
1381 self.sql_alter_sequence_type
1382 % {
1383 "sequence": quote_name(sequence_name),
1384 "type": db_types[new_internal_type],
1385 },
1386 [],
1387 ),
1388 ]
1389 return fragment, other_actions
1390 else:
1391 return self._alter_column_type_sql_base(
1392 model, old_field, new_field, new_type
1393 )
1394
1395 def _alter_column_type_sql_base(
1396 self,
1397 model: type[Model],
1398 old_field: Field,
1399 new_field: Field,
1400 new_type: str,
1401 ) -> tuple[tuple[str, list[Any]], list[tuple[str, list[Any]]]]:
1402 """Base implementation of _alter_column_type_sql without IDENTITY handling."""
1403 return (
1404 (
1405 self.sql_alter_column_type
1406 % {
1407 "column": quote_name(new_field.column),
1408 "type": new_type,
1409 },
1410 [],
1411 ),
1412 [],
1413 )
1414
1415 def _create_index_name(
1416 self, table_name: str, column_names: list[str], suffix: str = ""
1417 ) -> str:
1418 """
1419 Generate a unique name for an index/unique constraint.
1420
1421 The name is divided into 3 parts: the table name, the column names,
1422 and a unique digest and suffix.
1423 """
1424 _, table_name = split_identifier(table_name)
1425 hash_suffix_part = (
1426 f"{names_digest(table_name, *column_names, length=8)}{suffix}"
1427 )
1428 max_length = MAX_NAME_LENGTH
1429 # If everything fits into max_length, use that name.
1430 index_name = "{}_{}_{}".format(
1431 table_name, "_".join(column_names), hash_suffix_part
1432 )
1433 if len(index_name) <= max_length:
1434 return index_name
1435 # Shorten a long suffix.
1436 if len(hash_suffix_part) > max_length / 3:
1437 hash_suffix_part = hash_suffix_part[: max_length // 3]
1438 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1439 index_name = "{}_{}_{}".format(
1440 table_name[:other_length],
1441 "_".join(column_names)[:other_length],
1442 hash_suffix_part,
1443 )
1444 # Prepend D if needed to prevent the name from starting with an
1445 # underscore or a number.
1446 if index_name[0] == "_" or index_name[0].isdigit():
1447 index_name = f"D{index_name[:-1]}"
1448 return index_name
1449
1450 def _index_include_sql(
1451 self, model: type[Model], columns: list[str] | None
1452 ) -> str | Statement:
1453 if not columns:
1454 return ""
1455 return Statement(
1456 " INCLUDE (%(columns)s)",
1457 columns=Columns(model.model_options.db_table, columns),
1458 )
1459
1460 def _create_index_sql(
1461 self,
1462 model: type[Model],
1463 *,
1464 fields: list[Field] | None = None,
1465 name: str | None = None,
1466 suffix: str = "",
1467 using: str = "",
1468 col_suffixes: tuple[str, ...] = (),
1469 sql: str | None = None,
1470 opclasses: tuple[str, ...] = (),
1471 condition: str | None = None,
1472 concurrently: bool = False,
1473 include: list[str] | None = None,
1474 expressions: Any = None,
1475 ) -> Statement:
1476 """
1477 Return the SQL statement to create the index for one or several fields
1478 or expressions. `sql` can be specified if the syntax differs from the
1479 standard (GIS indexes, ...).
1480 """
1481 fields = fields or []
1482 expressions = expressions or []
1483 compiler = Query(model, alias_cols=False).get_compiler()
1484 columns = [field.column for field in fields]
1485 if sql is None:
1486 sql = (
1487 self.sql_create_index
1488 if not concurrently
1489 else self.sql_create_index_concurrently
1490 )
1491 table = model.model_options.db_table
1492
1493 def create_index_name(*args: Any, **kwargs: Any) -> str:
1494 nonlocal name
1495 if name is None:
1496 name = self._create_index_name(*args, **kwargs)
1497 return quote_name(name)
1498
1499 return Statement(
1500 sql,
1501 table=Table(table),
1502 name=IndexName(table, columns, suffix, create_index_name),
1503 using=using,
1504 columns=(
1505 self._index_columns(table, columns, col_suffixes, opclasses)
1506 if columns
1507 else Expressions(table, expressions, compiler, self.quote_value)
1508 ),
1509 extra="",
1510 condition=(" WHERE " + condition if condition else ""),
1511 include=self._index_include_sql(model, include),
1512 )
1513
1514 def _delete_index_sql(
1515 self,
1516 model: type[Model],
1517 name: str,
1518 sql: str | None = None,
1519 concurrently: bool = False,
1520 ) -> Statement:
1521 if sql is None:
1522 sql = (
1523 self.sql_delete_index_concurrently
1524 if concurrently
1525 else self.sql_delete_index
1526 )
1527 return Statement(
1528 sql,
1529 table=Table(model.model_options.db_table),
1530 name=quote_name(name),
1531 )
1532
1533 def _rename_index_sql(
1534 self, model: type[Model], old_name: str, new_name: str
1535 ) -> Statement:
1536 return Statement(
1537 self.sql_rename_index,
1538 table=Table(model.model_options.db_table),
1539 old_name=quote_name(old_name),
1540 new_name=quote_name(new_name),
1541 )
1542
1543 def _index_columns(
1544 self,
1545 table: str,
1546 columns: list[str],
1547 col_suffixes: tuple[str, ...],
1548 opclasses: tuple[str, ...],
1549 ) -> Columns:
1550 return Columns(
1551 table,
1552 columns,
1553 col_suffixes=col_suffixes,
1554 opclasses=opclasses,
1555 )
1556
1557 def _model_indexes_sql(self, model: type[Model]) -> list[Statement | None]:
1558 """
1559 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1560 """
1561 output: list[Statement | None] = []
1562 for field in model._model_meta.local_fields:
1563 output.extend(self._field_indexes_sql(model, field))
1564
1565 for index in model.model_options.indexes:
1566 if not index.contains_expressions:
1567 output.append(index.create_sql(model, self))
1568 return output
1569
1570 def _field_indexes_sql(self, model: type[Model], field: Field) -> list[Statement]:
1571 """
1572 Return a list of all index SQL statements for the specified field.
1573 """
1574 output: list[Statement] = []
1575 if self._field_should_be_indexed(model, field):
1576 output.append(self._create_index_sql(model, fields=[field]))
1577 # Add LIKE index for varchar/text primary keys
1578 like_index_statement = self._create_like_index_sql(model, field)
1579 if like_index_statement is not None:
1580 output.append(like_index_statement)
1581 return output
1582
1583 def _create_like_index_sql(
1584 self, model: type[Model], field: Field
1585 ) -> Statement | None:
1586 """
1587 Return the statement to create an index with varchar operator pattern
1588 when the column type is 'varchar' or 'text', otherwise return None.
1589 """
1590 db_type = field.db_type()
1591 if db_type is not None and field.primary_key:
1592 # Fields with database column types of `varchar` and `text` need
1593 # a second index that specifies their operator class, which is
1594 # needed when performing correct LIKE queries outside the
1595 # C locale. See #12234.
1596 #
1597 # The same doesn't apply to array fields such as varchar[size]
1598 # and text[size], so skip them.
1599 if "[" in db_type:
1600 return None
1601 if db_type.startswith("varchar"):
1602 return self._create_index_sql(
1603 model,
1604 fields=[field],
1605 suffix="_like",
1606 opclasses=("varchar_pattern_ops",),
1607 )
1608 elif db_type.startswith("text"):
1609 return self._create_index_sql(
1610 model,
1611 fields=[field],
1612 suffix="_like",
1613 opclasses=("text_pattern_ops",),
1614 )
1615 return None
1616
1617 def _field_should_be_altered(
1618 self, old_field: Field, new_field: Field, ignore: set[str] | None = None
1619 ) -> bool:
1620 ignore = ignore or set()
1621 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1622 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1623 # Don't alter when:
1624 # - changing only a field name
1625 # - changing an attribute that doesn't affect the schema
1626 # - changing an attribute in the provided set of ignored attributes
1627 for attr in ignore.union(old_field.non_db_attrs):
1628 old_kwargs.pop(attr, None)
1629 for attr in ignore.union(new_field.non_db_attrs):
1630 new_kwargs.pop(attr, None)
1631 return quote_name(old_field.column) != quote_name(new_field.column) or (
1632 old_path,
1633 old_args,
1634 old_kwargs,
1635 ) != (new_path, new_args, new_kwargs)
1636
1637 def _field_should_be_indexed(self, model: type[Model], field: Field) -> bool:
1638 if isinstance(field, ForeignKeyField):
1639 return bool(field.remote_field) and field.db_index and not field.primary_key
1640 return False
1641
1642 def _field_became_primary_key(self, old_field: Field, new_field: Field) -> bool:
1643 return not old_field.primary_key and new_field.primary_key
1644
1645 def _rename_field_sql(
1646 self, table: str, old_field: Field, new_field: Field, new_type: str
1647 ) -> str:
1648 return self.sql_rename_column % {
1649 "table": quote_name(table),
1650 "old_column": quote_name(old_field.column),
1651 "new_column": quote_name(new_field.column),
1652 "type": new_type,
1653 }
1654
1655 def _create_fk_sql(
1656 self, model: type[Model], field: ForeignKeyField, suffix: str
1657 ) -> Statement:
1658 table = Table(model.model_options.db_table)
1659 name = self._fk_constraint_name(model, field, suffix)
1660 column = Columns(model.model_options.db_table, [field.column])
1661 to_table = Table(field.target_field.model.model_options.db_table)
1662 to_column = Columns(
1663 field.target_field.model.model_options.db_table,
1664 [field.target_field.column],
1665 )
1666 deferrable = DEFERRABLE_SQL
1667 return Statement(
1668 self.sql_create_fk,
1669 table=table,
1670 name=name,
1671 column=column,
1672 to_table=to_table,
1673 to_column=to_column,
1674 deferrable=deferrable,
1675 )
1676
1677 def _fk_constraint_name(
1678 self, model: type[Model], field: ForeignKeyField, suffix: str
1679 ) -> ForeignKeyName:
1680 def create_fk_name(*args: Any, **kwargs: Any) -> str:
1681 return quote_name(self._create_index_name(*args, **kwargs))
1682
1683 return ForeignKeyName(
1684 model.model_options.db_table,
1685 [field.column],
1686 split_identifier(field.target_field.model.model_options.db_table)[1],
1687 [field.target_field.column],
1688 suffix,
1689 create_fk_name,
1690 )
1691
1692 def _deferrable_constraint_sql(self, deferrable: Deferrable | None) -> str:
1693 if deferrable is None:
1694 return ""
1695 if deferrable == Deferrable.DEFERRED:
1696 return " DEFERRABLE INITIALLY DEFERRED"
1697 if deferrable == Deferrable.IMMEDIATE:
1698 return " DEFERRABLE INITIALLY IMMEDIATE"
1699 return ""
1700
1701 def _unique_sql(
1702 self,
1703 model: type[Model],
1704 fields: Iterable[Field],
1705 name: str,
1706 condition: str | None = None,
1707 deferrable: Deferrable | None = None,
1708 include: list[str] | None = None,
1709 opclasses: tuple[str, ...] | None = None,
1710 expressions: Any = None,
1711 ) -> str | None:
1712 if condition or include or opclasses or expressions:
1713 # Databases support conditional, covering, and functional unique
1714 # constraints via a unique index.
1715 sql = self._create_unique_sql(
1716 model,
1717 fields,
1718 name=name,
1719 condition=condition,
1720 include=include,
1721 opclasses=opclasses,
1722 expressions=expressions,
1723 )
1724 if sql:
1725 self.deferred_sql.append(sql)
1726 return None
1727 constraint = self.sql_unique_constraint % {
1728 "columns": ", ".join([quote_name(field.column) for field in fields]),
1729 "deferrable": self._deferrable_constraint_sql(deferrable),
1730 }
1731 return self.sql_constraint % {
1732 "name": quote_name(name),
1733 "constraint": constraint,
1734 }
1735
1736 def _create_unique_sql(
1737 self,
1738 model: type[Model],
1739 fields: Iterable[Field],
1740 name: str | None = None,
1741 condition: str | None = None,
1742 deferrable: Deferrable | None = None,
1743 include: list[str] | None = None,
1744 opclasses: tuple[str, ...] | None = None,
1745 expressions: Any = None,
1746 ) -> Statement | None:
1747 compiler = Query(model, alias_cols=False).get_compiler()
1748 table = model.model_options.db_table
1749 columns = [field.column for field in fields]
1750 constraint_name: IndexName | str
1751 if name is None:
1752 constraint_name = self._unique_constraint_name(table, columns, quote=True)
1753 else:
1754 constraint_name = quote_name(name)
1755 if condition or include or opclasses or expressions:
1756 sql = self.sql_create_unique_index
1757 else:
1758 sql = self.sql_create_unique
1759 if columns:
1760 columns_obj: Columns | Expressions = self._index_columns(
1761 table, columns, col_suffixes=(), opclasses=opclasses or ()
1762 )
1763 else:
1764 columns_obj = Expressions(table, expressions, compiler, self.quote_value)
1765 return Statement(
1766 sql,
1767 table=Table(table),
1768 name=constraint_name,
1769 columns=columns_obj,
1770 condition=(" WHERE " + condition if condition else ""),
1771 deferrable=self._deferrable_constraint_sql(deferrable),
1772 include=self._index_include_sql(model, include),
1773 )
1774
1775 def _unique_constraint_name(
1776 self, table: str, columns: list[str], quote: bool = True
1777 ) -> IndexName | str:
1778 if quote:
1779
1780 def create_unique_name(*args: Any, **kwargs: Any) -> str:
1781 return quote_name(self._create_index_name(*args, **kwargs))
1782
1783 else:
1784 create_unique_name = self._create_index_name
1785
1786 return IndexName(table, columns, "_uniq", create_unique_name)
1787
1788 def _delete_unique_sql(
1789 self,
1790 model: type[Model],
1791 name: str,
1792 condition: str | None = None,
1793 deferrable: Deferrable | None = None,
1794 include: list[str] | None = None,
1795 opclasses: tuple[str, ...] | None = None,
1796 expressions: Any = None,
1797 ) -> Statement:
1798 if condition or include or opclasses or expressions:
1799 sql = self.sql_delete_index
1800 else:
1801 sql = self.sql_delete_unique
1802 return self._delete_constraint_sql(sql, model, name)
1803
1804 def _check_sql(self, name: str, check: str) -> str:
1805 return self.sql_constraint % {
1806 "name": quote_name(name),
1807 "constraint": self.sql_check_constraint % {"check": check},
1808 }
1809
1810 def _create_check_sql(self, model: type[Model], name: str, check: str) -> Statement:
1811 return Statement(
1812 self.sql_create_check,
1813 table=Table(model.model_options.db_table),
1814 name=quote_name(name),
1815 check=check,
1816 )
1817
1818 def _delete_constraint_sql(
1819 self, template: str, model: type[Model], name: str
1820 ) -> Statement:
1821 return Statement(
1822 template,
1823 table=Table(model.model_options.db_table),
1824 name=quote_name(name),
1825 )
1826
1827 def _constraint_names(
1828 self,
1829 model: type[Model],
1830 column_names: list[str] | None = None,
1831 unique: bool | None = None,
1832 primary_key: bool | None = None,
1833 index: bool | None = None,
1834 foreign_key: bool | None = None,
1835 check: bool | None = None,
1836 type_: str | None = None,
1837 exclude: set[str] | None = None,
1838 ) -> list[str]:
1839 """Return all constraint names matching the columns and conditions."""
1840 with self.connection.cursor() as cursor:
1841 constraints = self.connection.get_constraints(
1842 cursor, model.model_options.db_table
1843 )
1844 result: list[str] = []
1845 for name, infodict in constraints.items():
1846 if column_names is None or column_names == infodict["columns"]:
1847 if unique is not None and infodict["unique"] != unique:
1848 continue
1849 if primary_key is not None and infodict["primary_key"] != primary_key:
1850 continue
1851 if index is not None and infodict["index"] != index:
1852 continue
1853 if check is not None and infodict["check"] != check:
1854 continue
1855 if foreign_key is not None and not infodict["foreign_key"]:
1856 continue
1857 if type_ is not None and infodict["type"] != type_:
1858 continue
1859 if not exclude or name not in exclude:
1860 result.append(name)
1861 return result
1862
1863 def _delete_primary_key(self, model: type[Model], strict: bool = False) -> None:
1864 constraint_names = self._constraint_names(model, primary_key=True)
1865 if strict and len(constraint_names) != 1:
1866 raise ValueError(
1867 f"Found wrong number ({len(constraint_names)}) of PK constraints for {model.model_options.db_table}"
1868 )
1869 for constraint_name in constraint_names:
1870 self.execute(self._delete_primary_key_sql(model, constraint_name))
1871
1872 def _create_primary_key_sql(self, model: type[Model], field: Field) -> Statement:
1873 return Statement(
1874 self.sql_create_pk,
1875 table=Table(model.model_options.db_table),
1876 name=quote_name(
1877 self._create_index_name(
1878 model.model_options.db_table, [field.column], suffix="_pk"
1879 )
1880 ),
1881 columns=Columns(model.model_options.db_table, [field.column]),
1882 )
1883
1884 def _delete_primary_key_sql(self, model: type[Model], name: str) -> Statement:
1885 return self._delete_constraint_sql(self.sql_delete_pk, model, name)