1import logging
2import operator
3from datetime import datetime
4
5from plain.models.backends.ddl_references import (
6 Columns,
7 Expressions,
8 ForeignKeyName,
9 IndexName,
10 Statement,
11 Table,
12)
13from plain.models.backends.utils import names_digest, split_identifier, truncate_name
14from plain.models.constraints import Deferrable
15from plain.models.indexes import Index
16from plain.models.sql import Query
17from plain.models.transaction import TransactionManagementError, atomic
18from plain.runtime import settings
19from plain.utils import timezone
20
21logger = logging.getLogger("plain.models.backends.schema")
22
23
24def _is_relevant_relation(relation, altered_field):
25 """
26 When altering the given field, must constraints on its model from the given
27 relation be temporarily dropped?
28 """
29 field = relation.field
30 if field.many_to_many:
31 # M2M reverse field
32 return False
33 if altered_field.primary_key and field.to_fields == [None]:
34 # Foreign key constraint on the primary key, which is being altered.
35 return True
36 # Is the constraint targeting the field being altered?
37 return altered_field.name in field.to_fields
38
39
40def _all_related_fields(model):
41 # Related fields must be returned in a deterministic order.
42 return sorted(
43 model._meta._get_fields(
44 forward=False,
45 reverse=True,
46 include_hidden=True,
47 include_parents=False,
48 ),
49 key=operator.attrgetter("name"),
50 )
51
52
53def _related_non_m2m_objects(old_field, new_field):
54 # Filter out m2m objects from reverse relations.
55 # Return (old_relation, new_relation) tuples.
56 related_fields = zip(
57 (
58 obj
59 for obj in _all_related_fields(old_field.model)
60 if _is_relevant_relation(obj, old_field)
61 ),
62 (
63 obj
64 for obj in _all_related_fields(new_field.model)
65 if _is_relevant_relation(obj, new_field)
66 ),
67 )
68 for old_rel, new_rel in related_fields:
69 yield old_rel, new_rel
70 yield from _related_non_m2m_objects(
71 old_rel.remote_field,
72 new_rel.remote_field,
73 )
74
75
76class BaseDatabaseSchemaEditor:
77 """
78 This class and its subclasses are responsible for emitting schema-changing
79 statements to the databases - model creation/removal/alteration, field
80 renaming, index fiddling, and so on.
81 """
82
83 # Overrideable SQL templates
84 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
85 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
86 sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s"
87 sql_delete_table = "DROP TABLE %(table)s CASCADE"
88
89 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
90 sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
91 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
92 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
93 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
94 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
95 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
96 sql_alter_column_no_default_null = sql_alter_column_no_default
97 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
98 sql_rename_column = (
99 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
100 )
101 sql_update_with_default = (
102 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
103 )
104
105 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
106 sql_check_constraint = "CHECK (%(check)s)"
107 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
108 sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
109
110 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
111 sql_delete_check = sql_delete_constraint
112
113 sql_create_unique = (
114 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
115 "UNIQUE (%(columns)s)%(deferrable)s"
116 )
117 sql_delete_unique = sql_delete_constraint
118
119 sql_create_fk = (
120 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
121 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
122 )
123 sql_create_inline_fk = None
124 sql_create_column_inline_fk = None
125 sql_delete_fk = sql_delete_constraint
126
127 sql_create_index = (
128 "CREATE INDEX %(name)s ON %(table)s "
129 "(%(columns)s)%(include)s%(extra)s%(condition)s"
130 )
131 sql_create_unique_index = (
132 "CREATE UNIQUE INDEX %(name)s ON %(table)s "
133 "(%(columns)s)%(include)s%(condition)s"
134 )
135 sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
136 sql_delete_index = "DROP INDEX %(name)s"
137
138 sql_create_pk = (
139 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
140 )
141 sql_delete_pk = sql_delete_constraint
142
143 sql_delete_procedure = "DROP PROCEDURE %(procedure)s"
144
145 sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
146 sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
147
148 def __init__(self, connection, collect_sql=False, atomic=True):
149 self.connection = connection
150 self.collect_sql = collect_sql
151 if self.collect_sql:
152 self.collected_sql = []
153 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
154
155 # State-managing methods
156
157 def __enter__(self):
158 self.deferred_sql = []
159 if self.atomic_migration:
160 self.atomic = atomic(self.connection.alias)
161 self.atomic.__enter__()
162 return self
163
164 def __exit__(self, exc_type, exc_value, traceback):
165 if exc_type is None:
166 for sql in self.deferred_sql:
167 self.execute(sql)
168 if self.atomic_migration:
169 self.atomic.__exit__(exc_type, exc_value, traceback)
170
171 # Core utility functions
172
173 def execute(self, sql, params=()):
174 """Execute the given SQL statement, with optional parameters."""
175 # Don't perform the transactional DDL check if SQL is being collected
176 # as it's not going to be executed anyway.
177 if (
178 not self.collect_sql
179 and self.connection.in_atomic_block
180 and not self.connection.features.can_rollback_ddl
181 ):
182 raise TransactionManagementError(
183 "Executing DDL statements while in a transaction on databases "
184 "that can't perform a rollback is prohibited."
185 )
186 # Account for non-string statement objects.
187 sql = str(sql)
188 # Log the command we're running, then run it
189 logger.debug(
190 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
191 )
192 if self.collect_sql:
193 ending = "" if sql.rstrip().endswith(";") else ";"
194 if params is not None:
195 self.collected_sql.append(
196 (sql % tuple(map(self.quote_value, params))) + ending
197 )
198 else:
199 self.collected_sql.append(sql + ending)
200 else:
201 with self.connection.cursor() as cursor:
202 cursor.execute(sql, params)
203
204 def quote_name(self, name):
205 return self.connection.ops.quote_name(name)
206
207 def table_sql(self, model):
208 """Take a model and return its table definition."""
209 # Create column SQL, add FK deferreds if needed.
210 column_sqls = []
211 params = []
212 for field in model._meta.local_fields:
213 # SQL.
214 definition, extra_params = self.column_sql(model, field)
215 if definition is None:
216 continue
217 # Check constraints can go on the column SQL here.
218 db_params = field.db_parameters(connection=self.connection)
219 if db_params["check"]:
220 definition += " " + self.sql_check_constraint % db_params
221 # Autoincrement SQL (for backends with inline variant).
222 col_type_suffix = field.db_type_suffix(connection=self.connection)
223 if col_type_suffix:
224 definition += " %s" % col_type_suffix
225 params.extend(extra_params)
226 # FK.
227 if field.remote_field and field.db_constraint:
228 to_table = field.remote_field.model._meta.db_table
229 to_column = field.remote_field.model._meta.get_field(
230 field.remote_field.field_name
231 ).column
232 if self.sql_create_inline_fk:
233 definition += " " + self.sql_create_inline_fk % {
234 "to_table": self.quote_name(to_table),
235 "to_column": self.quote_name(to_column),
236 }
237 elif self.connection.features.supports_foreign_keys:
238 self.deferred_sql.append(
239 self._create_fk_sql(
240 model, field, "_fk_%(to_table)s_%(to_column)s"
241 )
242 )
243 # Add the SQL to our big list.
244 column_sqls.append(f"{self.quote_name(field.column)} {definition}")
245 # Autoincrement SQL (for backends with post table definition
246 # variant).
247 if field.get_internal_type() in (
248 "AutoField",
249 "BigAutoField",
250 "SmallAutoField",
251 ):
252 autoinc_sql = self.connection.ops.autoinc_sql(
253 model._meta.db_table, field.column
254 )
255 if autoinc_sql:
256 self.deferred_sql.extend(autoinc_sql)
257 constraints = [
258 constraint.constraint_sql(model, self)
259 for constraint in model._meta.constraints
260 ]
261 sql = self.sql_create_table % {
262 "table": self.quote_name(model._meta.db_table),
263 "definition": ", ".join(
264 str(constraint)
265 for constraint in (*column_sqls, *constraints)
266 if constraint
267 ),
268 }
269 if model._meta.db_tablespace:
270 tablespace_sql = self.connection.ops.tablespace_sql(
271 model._meta.db_tablespace
272 )
273 if tablespace_sql:
274 sql += " " + tablespace_sql
275 return sql, params
276
277 # Field <-> database mapping functions
278
279 def _iter_column_sql(
280 self, column_db_type, params, model, field, field_db_params, include_default
281 ):
282 yield column_db_type
283 if collation := field_db_params.get("collation"):
284 yield self._collate_sql(collation)
285 if self.connection.features.supports_comments_inline and field.db_comment:
286 yield self._comment_sql(field.db_comment)
287 # Work out nullability.
288 null = field.null
289 # Include a default value, if requested.
290 include_default = (
291 include_default
292 and not self.skip_default(field)
293 and
294 # Don't include a default value if it's a nullable field and the
295 # default cannot be dropped in the ALTER COLUMN statement (e.g.
296 # MySQL longtext and longblob).
297 not (null and self.skip_default_on_alter(field))
298 )
299 if include_default:
300 default_value = self.effective_default(field)
301 if default_value is not None:
302 column_default = "DEFAULT " + self._column_default_sql(field)
303 if self.connection.features.requires_literal_defaults:
304 # Some databases can't take defaults as a parameter (Oracle).
305 # If this is the case, the individual schema backend should
306 # implement prepare_default().
307 yield column_default % self.prepare_default(default_value)
308 else:
309 yield column_default
310 params.append(default_value)
311 # Oracle treats the empty string ('') as null, so coerce the null
312 # option whenever '' is a possible value.
313 if (
314 field.empty_strings_allowed
315 and not field.primary_key
316 and self.connection.features.interprets_empty_strings_as_nulls
317 ):
318 null = True
319 if not null:
320 yield "NOT NULL"
321 elif not self.connection.features.implied_column_null:
322 yield "NULL"
323 if field.primary_key:
324 yield "PRIMARY KEY"
325 elif field.unique:
326 yield "UNIQUE"
327 # Optionally add the tablespace if it's an implicitly indexed column.
328 tablespace = field.db_tablespace or model._meta.db_tablespace
329 if (
330 tablespace
331 and self.connection.features.supports_tablespaces
332 and field.unique
333 ):
334 yield self.connection.ops.tablespace_sql(tablespace, inline=True)
335
336 def column_sql(self, model, field, include_default=False):
337 """
338 Return the column definition for a field. The field must already have
339 had set_attributes_from_name() called.
340 """
341 # Get the column's type and use that as the basis of the SQL.
342 field_db_params = field.db_parameters(connection=self.connection)
343 column_db_type = field_db_params["type"]
344 # Check for fields that aren't actually columns (e.g. M2M).
345 if column_db_type is None:
346 return None, None
347 params = []
348 return (
349 " ".join(
350 # This appends to the params being returned.
351 self._iter_column_sql(
352 column_db_type,
353 params,
354 model,
355 field,
356 field_db_params,
357 include_default,
358 )
359 ),
360 params,
361 )
362
363 def skip_default(self, field):
364 """
365 Some backends don't accept default values for certain columns types
366 (i.e. MySQL longtext and longblob).
367 """
368 return False
369
370 def skip_default_on_alter(self, field):
371 """
372 Some backends don't accept default values for certain columns types
373 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
374 """
375 return False
376
377 def prepare_default(self, value):
378 """
379 Only used for backends which have requires_literal_defaults feature
380 """
381 raise NotImplementedError(
382 "subclasses of BaseDatabaseSchemaEditor for backends which have "
383 "requires_literal_defaults must provide a prepare_default() method"
384 )
385
386 def _column_default_sql(self, field):
387 """
388 Return the SQL to use in a DEFAULT clause. The resulting string should
389 contain a '%s' placeholder for a default value.
390 """
391 return "%s"
392
393 @staticmethod
394 def _effective_default(field):
395 # This method allows testing its logic without a connection.
396 if field.has_default():
397 default = field.get_default()
398 elif not field.null and field.blank and field.empty_strings_allowed:
399 if field.get_internal_type() == "BinaryField":
400 default = b""
401 else:
402 default = ""
403 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
404 internal_type = field.get_internal_type()
405 if internal_type == "DateTimeField":
406 default = timezone.now()
407 else:
408 default = datetime.now()
409 if internal_type == "DateField":
410 default = default.date()
411 elif internal_type == "TimeField":
412 default = default.time()
413 else:
414 default = None
415 return default
416
417 def effective_default(self, field):
418 """Return a field's effective database default value."""
419 return field.get_db_prep_save(self._effective_default(field), self.connection)
420
421 def quote_value(self, value):
422 """
423 Return a quoted version of the value so it's safe to use in an SQL
424 string. This is not safe against injection from user code; it is
425 intended only for use in making SQL scripts or preparing default values
426 for particularly tricky backends (defaults are not user-defined, though,
427 so this is safe).
428 """
429 raise NotImplementedError()
430
431 # Actions
432
433 def create_model(self, model):
434 """
435 Create a table and any accompanying indexes or unique constraints for
436 the given `model`.
437 """
438 sql, params = self.table_sql(model)
439 # Prevent using [] as params, in the case a literal '%' is used in the
440 # definition.
441 self.execute(sql, params or None)
442
443 if self.connection.features.supports_comments:
444 # Add table comment.
445 if model._meta.db_table_comment:
446 self.alter_db_table_comment(model, None, model._meta.db_table_comment)
447 # Add column comments.
448 if not self.connection.features.supports_comments_inline:
449 for field in model._meta.local_fields:
450 if field.db_comment:
451 field_db_params = field.db_parameters(
452 connection=self.connection
453 )
454 field_type = field_db_params["type"]
455 self.execute(
456 *self._alter_column_comment_sql(
457 model, field, field_type, field.db_comment
458 )
459 )
460 # Add any field index (deferred as SQLite _remake_table needs it).
461 self.deferred_sql.extend(self._model_indexes_sql(model))
462
463 # Make M2M tables
464 for field in model._meta.local_many_to_many:
465 if field.remote_field.through._meta.auto_created:
466 self.create_model(field.remote_field.through)
467
468 def delete_model(self, model):
469 """Delete a model from the database."""
470 # Handle auto-created intermediary models
471 for field in model._meta.local_many_to_many:
472 if field.remote_field.through._meta.auto_created:
473 self.delete_model(field.remote_field.through)
474
475 # Delete the table
476 self.execute(
477 self.sql_delete_table
478 % {
479 "table": self.quote_name(model._meta.db_table),
480 }
481 )
482 # Remove all deferred statements referencing the deleted table.
483 for sql in list(self.deferred_sql):
484 if isinstance(sql, Statement) and sql.references_table(
485 model._meta.db_table
486 ):
487 self.deferred_sql.remove(sql)
488
489 def add_index(self, model, index):
490 """Add an index on a model."""
491 if (
492 index.contains_expressions
493 and not self.connection.features.supports_expression_indexes
494 ):
495 return None
496 # Index.create_sql returns interpolated SQL which makes params=None a
497 # necessity to avoid escaping attempts on execution.
498 self.execute(index.create_sql(model, self), params=None)
499
500 def remove_index(self, model, index):
501 """Remove an index from a model."""
502 if (
503 index.contains_expressions
504 and not self.connection.features.supports_expression_indexes
505 ):
506 return None
507 self.execute(index.remove_sql(model, self))
508
509 def rename_index(self, model, old_index, new_index):
510 if self.connection.features.can_rename_index:
511 self.execute(
512 self._rename_index_sql(model, old_index.name, new_index.name),
513 params=None,
514 )
515 else:
516 self.remove_index(model, old_index)
517 self.add_index(model, new_index)
518
519 def add_constraint(self, model, constraint):
520 """Add a constraint to a model."""
521 sql = constraint.create_sql(model, self)
522 if sql:
523 # Constraint.create_sql returns interpolated SQL which makes
524 # params=None a necessity to avoid escaping attempts on execution.
525 self.execute(sql, params=None)
526
527 def remove_constraint(self, model, constraint):
528 """Remove a constraint from a model."""
529 sql = constraint.remove_sql(model, self)
530 if sql:
531 self.execute(sql)
532
533 def alter_db_table(self, model, old_db_table, new_db_table):
534 """Rename the table a model points to."""
535 if old_db_table == new_db_table or (
536 self.connection.features.ignores_table_name_case
537 and old_db_table.lower() == new_db_table.lower()
538 ):
539 return
540 self.execute(
541 self.sql_rename_table
542 % {
543 "old_table": self.quote_name(old_db_table),
544 "new_table": self.quote_name(new_db_table),
545 }
546 )
547 # Rename all references to the old table name.
548 for sql in self.deferred_sql:
549 if isinstance(sql, Statement):
550 sql.rename_table_references(old_db_table, new_db_table)
551
552 def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
553 self.execute(
554 self.sql_alter_table_comment
555 % {
556 "table": self.quote_name(model._meta.db_table),
557 "comment": self.quote_value(new_db_table_comment or ""),
558 }
559 )
560
561 def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace):
562 """Move a model's table between tablespaces."""
563 self.execute(
564 self.sql_retablespace_table
565 % {
566 "table": self.quote_name(model._meta.db_table),
567 "old_tablespace": self.quote_name(old_db_tablespace),
568 "new_tablespace": self.quote_name(new_db_tablespace),
569 }
570 )
571
572 def add_field(self, model, field):
573 """
574 Create a field on a model. Usually involves adding a column, but may
575 involve adding a table instead (for M2M fields).
576 """
577 # Special-case implicit M2M tables
578 if field.many_to_many and field.remote_field.through._meta.auto_created:
579 return self.create_model(field.remote_field.through)
580 # Get the column's definition
581 definition, params = self.column_sql(model, field, include_default=True)
582 # It might not actually have a column behind it
583 if definition is None:
584 return
585 if col_type_suffix := field.db_type_suffix(connection=self.connection):
586 definition += f" {col_type_suffix}"
587 # Check constraints can go on the column SQL here
588 db_params = field.db_parameters(connection=self.connection)
589 if db_params["check"]:
590 definition += " " + self.sql_check_constraint % db_params
591 if (
592 field.remote_field
593 and self.connection.features.supports_foreign_keys
594 and field.db_constraint
595 ):
596 constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
597 # Add FK constraint inline, if supported.
598 if self.sql_create_column_inline_fk:
599 to_table = field.remote_field.model._meta.db_table
600 to_column = field.remote_field.model._meta.get_field(
601 field.remote_field.field_name
602 ).column
603 namespace, _ = split_identifier(model._meta.db_table)
604 definition += " " + self.sql_create_column_inline_fk % {
605 "name": self._fk_constraint_name(model, field, constraint_suffix),
606 "namespace": "%s." % self.quote_name(namespace)
607 if namespace
608 else "",
609 "column": self.quote_name(field.column),
610 "to_table": self.quote_name(to_table),
611 "to_column": self.quote_name(to_column),
612 "deferrable": self.connection.ops.deferrable_sql(),
613 }
614 # Otherwise, add FK constraints later.
615 else:
616 self.deferred_sql.append(
617 self._create_fk_sql(model, field, constraint_suffix)
618 )
619 # Build the SQL and run it
620 sql = self.sql_create_column % {
621 "table": self.quote_name(model._meta.db_table),
622 "column": self.quote_name(field.column),
623 "definition": definition,
624 }
625 self.execute(sql, params)
626 # Drop the default if we need to
627 # (Plain usually does not use in-database defaults)
628 if (
629 not self.skip_default_on_alter(field)
630 and self.effective_default(field) is not None
631 ):
632 changes_sql, params = self._alter_column_default_sql(
633 model, None, field, drop=True
634 )
635 sql = self.sql_alter_column % {
636 "table": self.quote_name(model._meta.db_table),
637 "changes": changes_sql,
638 }
639 self.execute(sql, params)
640 # Add field comment, if required.
641 if (
642 field.db_comment
643 and self.connection.features.supports_comments
644 and not self.connection.features.supports_comments_inline
645 ):
646 field_type = db_params["type"]
647 self.execute(
648 *self._alter_column_comment_sql(
649 model, field, field_type, field.db_comment
650 )
651 )
652 # Add an index, if required
653 self.deferred_sql.extend(self._field_indexes_sql(model, field))
654 # Reset connection if required
655 if self.connection.features.connection_persists_old_columns:
656 self.connection.close()
657
658 def remove_field(self, model, field):
659 """
660 Remove a field from a model. Usually involves deleting a column,
661 but for M2Ms may involve deleting a table.
662 """
663 # Special-case implicit M2M tables
664 if field.many_to_many and field.remote_field.through._meta.auto_created:
665 return self.delete_model(field.remote_field.through)
666 # It might not actually have a column behind it
667 if field.db_parameters(connection=self.connection)["type"] is None:
668 return
669 # Drop any FK constraints, MySQL requires explicit deletion
670 if field.remote_field:
671 fk_names = self._constraint_names(model, [field.column], foreign_key=True)
672 for fk_name in fk_names:
673 self.execute(self._delete_fk_sql(model, fk_name))
674 # Delete the column
675 sql = self.sql_delete_column % {
676 "table": self.quote_name(model._meta.db_table),
677 "column": self.quote_name(field.column),
678 }
679 self.execute(sql)
680 # Reset connection if required
681 if self.connection.features.connection_persists_old_columns:
682 self.connection.close()
683 # Remove all deferred statements referencing the deleted column.
684 for sql in list(self.deferred_sql):
685 if isinstance(sql, Statement) and sql.references_column(
686 model._meta.db_table, field.column
687 ):
688 self.deferred_sql.remove(sql)
689
690 def alter_field(self, model, old_field, new_field, strict=False):
691 """
692 Allow a field's type, uniqueness, nullability, default, column,
693 constraints, etc. to be modified.
694 `old_field` is required to compute the necessary changes.
695 If `strict` is True, raise errors if the old column does not match
696 `old_field` precisely.
697 """
698 if not self._field_should_be_altered(old_field, new_field):
699 return
700 # Ensure this field is even column-based
701 old_db_params = old_field.db_parameters(connection=self.connection)
702 old_type = old_db_params["type"]
703 new_db_params = new_field.db_parameters(connection=self.connection)
704 new_type = new_db_params["type"]
705 if (old_type is None and old_field.remote_field is None) or (
706 new_type is None and new_field.remote_field is None
707 ):
708 raise ValueError(
709 "Cannot alter field {} into {} - they do not properly define "
710 "db_type (are you using a badly-written custom field?)".format(
711 old_field, new_field
712 ),
713 )
714 elif (
715 old_type is None
716 and new_type is None
717 and (
718 old_field.remote_field.through
719 and new_field.remote_field.through
720 and old_field.remote_field.through._meta.auto_created
721 and new_field.remote_field.through._meta.auto_created
722 )
723 ):
724 return self._alter_many_to_many(model, old_field, new_field, strict)
725 elif (
726 old_type is None
727 and new_type is None
728 and (
729 old_field.remote_field.through
730 and new_field.remote_field.through
731 and not old_field.remote_field.through._meta.auto_created
732 and not new_field.remote_field.through._meta.auto_created
733 )
734 ):
735 # Both sides have through models; this is a no-op.
736 return
737 elif old_type is None or new_type is None:
738 raise ValueError(
739 "Cannot alter field {} into {} - they are not compatible types "
740 "(you cannot alter to or from M2M fields, or add or remove "
741 "through= on M2M fields)".format(old_field, new_field)
742 )
743
744 self._alter_field(
745 model,
746 old_field,
747 new_field,
748 old_type,
749 new_type,
750 old_db_params,
751 new_db_params,
752 strict,
753 )
754
755 def _field_db_check(self, field, field_db_params):
756 # Always check constraints with the same mocked column name to avoid
757 # recreating constrains when the column is renamed.
758 check_constraints = self.connection.data_type_check_constraints
759 data = field.db_type_parameters(self.connection)
760 data["column"] = "__column_name__"
761 try:
762 return check_constraints[field.get_internal_type()] % data
763 except KeyError:
764 return None
765
766 def _alter_field(
767 self,
768 model,
769 old_field,
770 new_field,
771 old_type,
772 new_type,
773 old_db_params,
774 new_db_params,
775 strict=False,
776 ):
777 """Perform a "physical" (non-ManyToMany) field update."""
778 # Drop any FK constraints, we'll remake them later
779 fks_dropped = set()
780 if (
781 self.connection.features.supports_foreign_keys
782 and old_field.remote_field
783 and old_field.db_constraint
784 and self._field_should_be_altered(
785 old_field,
786 new_field,
787 ignore={"db_comment"},
788 )
789 ):
790 fk_names = self._constraint_names(
791 model, [old_field.column], foreign_key=True
792 )
793 if strict and len(fk_names) != 1:
794 raise ValueError(
795 "Found wrong number ({}) of foreign key constraints for {}.{}".format(
796 len(fk_names),
797 model._meta.db_table,
798 old_field.column,
799 )
800 )
801 for fk_name in fk_names:
802 fks_dropped.add((old_field.column,))
803 self.execute(self._delete_fk_sql(model, fk_name))
804 # Has unique been removed?
805 if old_field.unique and (
806 not new_field.unique or self._field_became_primary_key(old_field, new_field)
807 ):
808 # Find the unique constraint for this field
809 meta_constraint_names = {
810 constraint.name for constraint in model._meta.constraints
811 }
812 constraint_names = self._constraint_names(
813 model,
814 [old_field.column],
815 unique=True,
816 primary_key=False,
817 exclude=meta_constraint_names,
818 )
819 if strict and len(constraint_names) != 1:
820 raise ValueError(
821 "Found wrong number ({}) of unique constraints for {}.{}".format(
822 len(constraint_names),
823 model._meta.db_table,
824 old_field.column,
825 )
826 )
827 for constraint_name in constraint_names:
828 self.execute(self._delete_unique_sql(model, constraint_name))
829 # Drop incoming FK constraints if the field is a primary key or unique,
830 # which might be a to_field target, and things are going to change.
831 old_collation = old_db_params.get("collation")
832 new_collation = new_db_params.get("collation")
833 drop_foreign_keys = (
834 self.connection.features.supports_foreign_keys
835 and (
836 (old_field.primary_key and new_field.primary_key)
837 or (old_field.unique and new_field.unique)
838 )
839 and ((old_type != new_type) or (old_collation != new_collation))
840 )
841 if drop_foreign_keys:
842 # '_meta.related_field' also contains M2M reverse fields, these
843 # will be filtered out
844 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
845 rel_fk_names = self._constraint_names(
846 new_rel.related_model, [new_rel.field.column], foreign_key=True
847 )
848 for fk_name in rel_fk_names:
849 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
850 # Removed an index? (no strict check, as multiple indexes are possible)
851 # Remove indexes if db_index switched to False or a unique constraint
852 # will now be used in lieu of an index. The following lines from the
853 # truth table show all True cases; the rest are False:
854 #
855 # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
856 # ------------------------------------------------------------------------------
857 # True | False | False | False
858 # True | False | False | True
859 # True | False | True | True
860 if (
861 old_field.db_index
862 and not old_field.unique
863 and (not new_field.db_index or new_field.unique)
864 ):
865 # Find the index for this field
866 meta_index_names = {index.name for index in model._meta.indexes}
867 # Retrieve only BTREE indexes since this is what's created with
868 # db_index=True.
869 index_names = self._constraint_names(
870 model,
871 [old_field.column],
872 index=True,
873 type_=Index.suffix,
874 exclude=meta_index_names,
875 )
876 for index_name in index_names:
877 # The only way to check if an index was created with
878 # db_index=True or with Index(['field'], name='foo')
879 # is to look at its name (refs #28053).
880 self.execute(self._delete_index_sql(model, index_name))
881 # Change check constraints?
882 old_db_check = self._field_db_check(old_field, old_db_params)
883 new_db_check = self._field_db_check(new_field, new_db_params)
884 if old_db_check != new_db_check and old_db_check:
885 meta_constraint_names = {
886 constraint.name for constraint in model._meta.constraints
887 }
888 constraint_names = self._constraint_names(
889 model,
890 [old_field.column],
891 check=True,
892 exclude=meta_constraint_names,
893 )
894 if strict and len(constraint_names) != 1:
895 raise ValueError(
896 "Found wrong number ({}) of check constraints for {}.{}".format(
897 len(constraint_names),
898 model._meta.db_table,
899 old_field.column,
900 )
901 )
902 for constraint_name in constraint_names:
903 self.execute(self._delete_check_sql(model, constraint_name))
904 # Have they renamed the column?
905 if old_field.column != new_field.column:
906 self.execute(
907 self._rename_field_sql(
908 model._meta.db_table, old_field, new_field, new_type
909 )
910 )
911 # Rename all references to the renamed column.
912 for sql in self.deferred_sql:
913 if isinstance(sql, Statement):
914 sql.rename_column_references(
915 model._meta.db_table, old_field.column, new_field.column
916 )
917 # Next, start accumulating actions to do
918 actions = []
919 null_actions = []
920 post_actions = []
921 # Type suffix change? (e.g. auto increment).
922 old_type_suffix = old_field.db_type_suffix(connection=self.connection)
923 new_type_suffix = new_field.db_type_suffix(connection=self.connection)
924 # Type, collation, or comment change?
925 if (
926 old_type != new_type
927 or old_type_suffix != new_type_suffix
928 or old_collation != new_collation
929 or (
930 self.connection.features.supports_comments
931 and old_field.db_comment != new_field.db_comment
932 )
933 ):
934 fragment, other_actions = self._alter_column_type_sql(
935 model, old_field, new_field, new_type, old_collation, new_collation
936 )
937 actions.append(fragment)
938 post_actions.extend(other_actions)
939 # When changing a column NULL constraint to NOT NULL with a given
940 # default value, we need to perform 4 steps:
941 # 1. Add a default for new incoming writes
942 # 2. Update existing NULL rows with new default
943 # 3. Replace NULL constraint with NOT NULL
944 # 4. Drop the default again.
945 # Default change?
946 needs_database_default = False
947 if old_field.null and not new_field.null:
948 old_default = self.effective_default(old_field)
949 new_default = self.effective_default(new_field)
950 if (
951 not self.skip_default_on_alter(new_field)
952 and old_default != new_default
953 and new_default is not None
954 ):
955 needs_database_default = True
956 actions.append(
957 self._alter_column_default_sql(model, old_field, new_field)
958 )
959 # Nullability change?
960 if old_field.null != new_field.null:
961 fragment = self._alter_column_null_sql(model, old_field, new_field)
962 if fragment:
963 null_actions.append(fragment)
964 # Only if we have a default and there is a change from NULL to NOT NULL
965 four_way_default_alteration = new_field.has_default() and (
966 old_field.null and not new_field.null
967 )
968 if actions or null_actions:
969 if not four_way_default_alteration:
970 # If we don't have to do a 4-way default alteration we can
971 # directly run a (NOT) NULL alteration
972 actions += null_actions
973 # Combine actions together if we can (e.g. postgres)
974 if self.connection.features.supports_combined_alters and actions:
975 sql, params = tuple(zip(*actions))
976 actions = [(", ".join(sql), sum(params, []))]
977 # Apply those actions
978 for sql, params in actions:
979 self.execute(
980 self.sql_alter_column
981 % {
982 "table": self.quote_name(model._meta.db_table),
983 "changes": sql,
984 },
985 params,
986 )
987 if four_way_default_alteration:
988 # Update existing rows with default value
989 self.execute(
990 self.sql_update_with_default
991 % {
992 "table": self.quote_name(model._meta.db_table),
993 "column": self.quote_name(new_field.column),
994 "default": "%s",
995 },
996 [new_default],
997 )
998 # Since we didn't run a NOT NULL change before we need to do it
999 # now
1000 for sql, params in null_actions:
1001 self.execute(
1002 self.sql_alter_column
1003 % {
1004 "table": self.quote_name(model._meta.db_table),
1005 "changes": sql,
1006 },
1007 params,
1008 )
1009 if post_actions:
1010 for sql, params in post_actions:
1011 self.execute(sql, params)
1012 # If primary_key changed to False, delete the primary key constraint.
1013 if old_field.primary_key and not new_field.primary_key:
1014 self._delete_primary_key(model, strict)
1015 # Added a unique?
1016 if self._unique_should_be_added(old_field, new_field):
1017 self.execute(self._create_unique_sql(model, [new_field]))
1018 # Added an index? Add an index if db_index switched to True or a unique
1019 # constraint will no longer be used in lieu of an index. The following
1020 # lines from the truth table show all True cases; the rest are False:
1021 #
1022 # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
1023 # ------------------------------------------------------------------------------
1024 # False | False | True | False
1025 # False | True | True | False
1026 # True | True | True | False
1027 if (
1028 (not old_field.db_index or old_field.unique)
1029 and new_field.db_index
1030 and not new_field.unique
1031 ):
1032 self.execute(self._create_index_sql(model, fields=[new_field]))
1033 # Type alteration on primary key? Then we need to alter the column
1034 # referring to us.
1035 rels_to_update = []
1036 if drop_foreign_keys:
1037 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1038 # Changed to become primary key?
1039 if self._field_became_primary_key(old_field, new_field):
1040 # Make the new one
1041 self.execute(self._create_primary_key_sql(model, new_field))
1042 # Update all referencing columns
1043 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
1044 # Handle our type alters on the other end of rels from the PK stuff above
1045 for old_rel, new_rel in rels_to_update:
1046 rel_db_params = new_rel.field.db_parameters(connection=self.connection)
1047 rel_type = rel_db_params["type"]
1048 rel_collation = rel_db_params.get("collation")
1049 old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
1050 old_rel_collation = old_rel_db_params.get("collation")
1051 fragment, other_actions = self._alter_column_type_sql(
1052 new_rel.related_model,
1053 old_rel.field,
1054 new_rel.field,
1055 rel_type,
1056 old_rel_collation,
1057 rel_collation,
1058 )
1059 self.execute(
1060 self.sql_alter_column
1061 % {
1062 "table": self.quote_name(new_rel.related_model._meta.db_table),
1063 "changes": fragment[0],
1064 },
1065 fragment[1],
1066 )
1067 for sql, params in other_actions:
1068 self.execute(sql, params)
1069 # Does it have a foreign key?
1070 if (
1071 self.connection.features.supports_foreign_keys
1072 and new_field.remote_field
1073 and (
1074 fks_dropped or not old_field.remote_field or not old_field.db_constraint
1075 )
1076 and new_field.db_constraint
1077 ):
1078 self.execute(
1079 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
1080 )
1081 # Rebuild FKs that pointed to us if we previously had to drop them
1082 if drop_foreign_keys:
1083 for _, rel in rels_to_update:
1084 if rel.field.db_constraint:
1085 self.execute(
1086 self._create_fk_sql(rel.related_model, rel.field, "_fk")
1087 )
1088 # Does it have check constraints we need to add?
1089 if old_db_check != new_db_check and new_db_check:
1090 constraint_name = self._create_index_name(
1091 model._meta.db_table, [new_field.column], suffix="_check"
1092 )
1093 self.execute(
1094 self._create_check_sql(model, constraint_name, new_db_params["check"])
1095 )
1096 # Drop the default if we need to
1097 # (Plain usually does not use in-database defaults)
1098 if needs_database_default:
1099 changes_sql, params = self._alter_column_default_sql(
1100 model, old_field, new_field, drop=True
1101 )
1102 sql = self.sql_alter_column % {
1103 "table": self.quote_name(model._meta.db_table),
1104 "changes": changes_sql,
1105 }
1106 self.execute(sql, params)
1107 # Reset connection if required
1108 if self.connection.features.connection_persists_old_columns:
1109 self.connection.close()
1110
1111 def _alter_column_null_sql(self, model, old_field, new_field):
1112 """
1113 Hook to specialize column null alteration.
1114
1115 Return a (sql, params) fragment to set a column to null or non-null
1116 as required by new_field, or None if no changes are required.
1117 """
1118 if (
1119 self.connection.features.interprets_empty_strings_as_nulls
1120 and new_field.empty_strings_allowed
1121 ):
1122 # The field is nullable in the database anyway, leave it alone.
1123 return
1124 else:
1125 new_db_params = new_field.db_parameters(connection=self.connection)
1126 sql = (
1127 self.sql_alter_column_null
1128 if new_field.null
1129 else self.sql_alter_column_not_null
1130 )
1131 return (
1132 sql
1133 % {
1134 "column": self.quote_name(new_field.column),
1135 "type": new_db_params["type"],
1136 },
1137 [],
1138 )
1139
1140 def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
1141 """
1142 Hook to specialize column default alteration.
1143
1144 Return a (sql, params) fragment to add or drop (depending on the drop
1145 argument) a default to new_field's column.
1146 """
1147 new_default = self.effective_default(new_field)
1148 default = self._column_default_sql(new_field)
1149 params = [new_default]
1150
1151 if drop:
1152 params = []
1153 elif self.connection.features.requires_literal_defaults:
1154 # Some databases (Oracle) can't take defaults as a parameter
1155 # If this is the case, the SchemaEditor for that database should
1156 # implement prepare_default().
1157 default = self.prepare_default(new_default)
1158 params = []
1159
1160 new_db_params = new_field.db_parameters(connection=self.connection)
1161 if drop:
1162 if new_field.null:
1163 sql = self.sql_alter_column_no_default_null
1164 else:
1165 sql = self.sql_alter_column_no_default
1166 else:
1167 sql = self.sql_alter_column_default
1168 return (
1169 sql
1170 % {
1171 "column": self.quote_name(new_field.column),
1172 "type": new_db_params["type"],
1173 "default": default,
1174 },
1175 params,
1176 )
1177
1178 def _alter_column_type_sql(
1179 self, model, old_field, new_field, new_type, old_collation, new_collation
1180 ):
1181 """
1182 Hook to specialize column type alteration for different backends,
1183 for cases when a creation type is different to an alteration type
1184 (e.g. SERIAL in PostgreSQL, PostGIS fields).
1185
1186 Return a two-tuple of: an SQL fragment of (sql, params) to insert into
1187 an ALTER TABLE statement and a list of extra (sql, params) tuples to
1188 run once the field is altered.
1189 """
1190 other_actions = []
1191 if collate_sql := self._collate_sql(
1192 new_collation, old_collation, model._meta.db_table
1193 ):
1194 collate_sql = f" {collate_sql}"
1195 else:
1196 collate_sql = ""
1197 # Comment change?
1198 comment_sql = ""
1199 if self.connection.features.supports_comments and not new_field.many_to_many:
1200 if old_field.db_comment != new_field.db_comment:
1201 # PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
1202 # 'COMMENT ON ...' at the same time.
1203 sql, params = self._alter_column_comment_sql(
1204 model, new_field, new_type, new_field.db_comment
1205 )
1206 if sql:
1207 other_actions.append((sql, params))
1208 if new_field.db_comment:
1209 comment_sql = self._comment_sql(new_field.db_comment)
1210 return (
1211 (
1212 self.sql_alter_column_type
1213 % {
1214 "column": self.quote_name(new_field.column),
1215 "type": new_type,
1216 "collation": collate_sql,
1217 "comment": comment_sql,
1218 },
1219 [],
1220 ),
1221 other_actions,
1222 )
1223
1224 def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
1225 return (
1226 self.sql_alter_column_comment
1227 % {
1228 "table": self.quote_name(model._meta.db_table),
1229 "column": self.quote_name(new_field.column),
1230 "comment": self._comment_sql(new_db_comment),
1231 },
1232 [],
1233 )
1234
1235 def _comment_sql(self, comment):
1236 return self.quote_value(comment or "")
1237
1238 def _alter_many_to_many(self, model, old_field, new_field, strict):
1239 """Alter M2Ms to repoint their to= endpoints."""
1240 # Rename the through table
1241 if (
1242 old_field.remote_field.through._meta.db_table
1243 != new_field.remote_field.through._meta.db_table
1244 ):
1245 self.alter_db_table(
1246 old_field.remote_field.through,
1247 old_field.remote_field.through._meta.db_table,
1248 new_field.remote_field.through._meta.db_table,
1249 )
1250 # Repoint the FK to the other side
1251 self.alter_field(
1252 new_field.remote_field.through,
1253 # The field that points to the target model is needed, so we can
1254 # tell alter_field to change it - this is m2m_reverse_field_name()
1255 # (as opposed to m2m_field_name(), which points to our model).
1256 old_field.remote_field.through._meta.get_field(
1257 old_field.m2m_reverse_field_name()
1258 ),
1259 new_field.remote_field.through._meta.get_field(
1260 new_field.m2m_reverse_field_name()
1261 ),
1262 )
1263 self.alter_field(
1264 new_field.remote_field.through,
1265 # for self-referential models we need to alter field from the other end too
1266 old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
1267 new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
1268 )
1269
1270 def _create_index_name(self, table_name, column_names, suffix=""):
1271 """
1272 Generate a unique name for an index/unique constraint.
1273
1274 The name is divided into 3 parts: the table name, the column names,
1275 and a unique digest and suffix.
1276 """
1277 _, table_name = split_identifier(table_name)
1278 hash_suffix_part = "{}{}".format(
1279 names_digest(table_name, *column_names, length=8),
1280 suffix,
1281 )
1282 max_length = self.connection.ops.max_name_length() or 200
1283 # If everything fits into max_length, use that name.
1284 index_name = "{}_{}_{}".format(
1285 table_name, "_".join(column_names), hash_suffix_part
1286 )
1287 if len(index_name) <= max_length:
1288 return index_name
1289 # Shorten a long suffix.
1290 if len(hash_suffix_part) > max_length / 3:
1291 hash_suffix_part = hash_suffix_part[: max_length // 3]
1292 other_length = (max_length - len(hash_suffix_part)) // 2 - 1
1293 index_name = "{}_{}_{}".format(
1294 table_name[:other_length],
1295 "_".join(column_names)[:other_length],
1296 hash_suffix_part,
1297 )
1298 # Prepend D if needed to prevent the name from starting with an
1299 # underscore or a number (not permitted on Oracle).
1300 if index_name[0] == "_" or index_name[0].isdigit():
1301 index_name = "D%s" % index_name[:-1]
1302 return index_name
1303
1304 def _get_index_tablespace_sql(self, model, fields, db_tablespace=None):
1305 if db_tablespace is None:
1306 if len(fields) == 1 and fields[0].db_tablespace:
1307 db_tablespace = fields[0].db_tablespace
1308 elif settings.DEFAULT_INDEX_TABLESPACE:
1309 db_tablespace = settings.DEFAULT_INDEX_TABLESPACE
1310 elif model._meta.db_tablespace:
1311 db_tablespace = model._meta.db_tablespace
1312 if db_tablespace is not None:
1313 return " " + self.connection.ops.tablespace_sql(db_tablespace)
1314 return ""
1315
1316 def _index_condition_sql(self, condition):
1317 if condition:
1318 return " WHERE " + condition
1319 return ""
1320
1321 def _index_include_sql(self, model, columns):
1322 if not columns or not self.connection.features.supports_covering_indexes:
1323 return ""
1324 return Statement(
1325 " INCLUDE (%(columns)s)",
1326 columns=Columns(model._meta.db_table, columns, self.quote_name),
1327 )
1328
1329 def _create_index_sql(
1330 self,
1331 model,
1332 *,
1333 fields=None,
1334 name=None,
1335 suffix="",
1336 using="",
1337 db_tablespace=None,
1338 col_suffixes=(),
1339 sql=None,
1340 opclasses=(),
1341 condition=None,
1342 include=None,
1343 expressions=None,
1344 ):
1345 """
1346 Return the SQL statement to create the index for one or several fields
1347 or expressions. `sql` can be specified if the syntax differs from the
1348 standard (GIS indexes, ...).
1349 """
1350 fields = fields or []
1351 expressions = expressions or []
1352 compiler = Query(model, alias_cols=False).get_compiler(
1353 connection=self.connection,
1354 )
1355 tablespace_sql = self._get_index_tablespace_sql(
1356 model, fields, db_tablespace=db_tablespace
1357 )
1358 columns = [field.column for field in fields]
1359 sql_create_index = sql or self.sql_create_index
1360 table = model._meta.db_table
1361
1362 def create_index_name(*args, **kwargs):
1363 nonlocal name
1364 if name is None:
1365 name = self._create_index_name(*args, **kwargs)
1366 return self.quote_name(name)
1367
1368 return Statement(
1369 sql_create_index,
1370 table=Table(table, self.quote_name),
1371 name=IndexName(table, columns, suffix, create_index_name),
1372 using=using,
1373 columns=(
1374 self._index_columns(table, columns, col_suffixes, opclasses)
1375 if columns
1376 else Expressions(table, expressions, compiler, self.quote_value)
1377 ),
1378 extra=tablespace_sql,
1379 condition=self._index_condition_sql(condition),
1380 include=self._index_include_sql(model, include),
1381 )
1382
1383 def _delete_index_sql(self, model, name, sql=None):
1384 return Statement(
1385 sql or self.sql_delete_index,
1386 table=Table(model._meta.db_table, self.quote_name),
1387 name=self.quote_name(name),
1388 )
1389
1390 def _rename_index_sql(self, model, old_name, new_name):
1391 return Statement(
1392 self.sql_rename_index,
1393 table=Table(model._meta.db_table, self.quote_name),
1394 old_name=self.quote_name(old_name),
1395 new_name=self.quote_name(new_name),
1396 )
1397
1398 def _index_columns(self, table, columns, col_suffixes, opclasses):
1399 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
1400
1401 def _model_indexes_sql(self, model):
1402 """
1403 Return a list of all index SQL statements (field indexes, Meta.indexes) for the specified model.
1404 """
1405 if not model._meta.managed or model._meta.swapped:
1406 return []
1407 output = []
1408 for field in model._meta.local_fields:
1409 output.extend(self._field_indexes_sql(model, field))
1410
1411 for index in model._meta.indexes:
1412 if (
1413 not index.contains_expressions
1414 or self.connection.features.supports_expression_indexes
1415 ):
1416 output.append(index.create_sql(model, self))
1417 return output
1418
1419 def _field_indexes_sql(self, model, field):
1420 """
1421 Return a list of all index SQL statements for the specified field.
1422 """
1423 output = []
1424 if self._field_should_be_indexed(model, field):
1425 output.append(self._create_index_sql(model, fields=[field]))
1426 return output
1427
1428 def _field_should_be_altered(self, old_field, new_field, ignore=None):
1429 ignore = ignore or set()
1430 _, old_path, old_args, old_kwargs = old_field.deconstruct()
1431 _, new_path, new_args, new_kwargs = new_field.deconstruct()
1432 # Don't alter when:
1433 # - changing only a field name
1434 # - changing an attribute that doesn't affect the schema
1435 # - changing an attribute in the provided set of ignored attributes
1436 # - adding only a db_column and the column name is not changed
1437 for attr in ignore.union(old_field.non_db_attrs):
1438 old_kwargs.pop(attr, None)
1439 for attr in ignore.union(new_field.non_db_attrs):
1440 new_kwargs.pop(attr, None)
1441 return self.quote_name(old_field.column) != self.quote_name(
1442 new_field.column
1443 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
1444
1445 def _field_should_be_indexed(self, model, field):
1446 return field.db_index and not field.unique
1447
1448 def _field_became_primary_key(self, old_field, new_field):
1449 return not old_field.primary_key and new_field.primary_key
1450
1451 def _unique_should_be_added(self, old_field, new_field):
1452 return (
1453 not new_field.primary_key
1454 and new_field.unique
1455 and (not old_field.unique or old_field.primary_key)
1456 )
1457
1458 def _rename_field_sql(self, table, old_field, new_field, new_type):
1459 return self.sql_rename_column % {
1460 "table": self.quote_name(table),
1461 "old_column": self.quote_name(old_field.column),
1462 "new_column": self.quote_name(new_field.column),
1463 "type": new_type,
1464 }
1465
1466 def _create_fk_sql(self, model, field, suffix):
1467 table = Table(model._meta.db_table, self.quote_name)
1468 name = self._fk_constraint_name(model, field, suffix)
1469 column = Columns(model._meta.db_table, [field.column], self.quote_name)
1470 to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
1471 to_column = Columns(
1472 field.target_field.model._meta.db_table,
1473 [field.target_field.column],
1474 self.quote_name,
1475 )
1476 deferrable = self.connection.ops.deferrable_sql()
1477 return Statement(
1478 self.sql_create_fk,
1479 table=table,
1480 name=name,
1481 column=column,
1482 to_table=to_table,
1483 to_column=to_column,
1484 deferrable=deferrable,
1485 )
1486
1487 def _fk_constraint_name(self, model, field, suffix):
1488 def create_fk_name(*args, **kwargs):
1489 return self.quote_name(self._create_index_name(*args, **kwargs))
1490
1491 return ForeignKeyName(
1492 model._meta.db_table,
1493 [field.column],
1494 split_identifier(field.target_field.model._meta.db_table)[1],
1495 [field.target_field.column],
1496 suffix,
1497 create_fk_name,
1498 )
1499
1500 def _delete_fk_sql(self, model, name):
1501 return self._delete_constraint_sql(self.sql_delete_fk, model, name)
1502
1503 def _deferrable_constraint_sql(self, deferrable):
1504 if deferrable is None:
1505 return ""
1506 if deferrable == Deferrable.DEFERRED:
1507 return " DEFERRABLE INITIALLY DEFERRED"
1508 if deferrable == Deferrable.IMMEDIATE:
1509 return " DEFERRABLE INITIALLY IMMEDIATE"
1510
1511 def _unique_sql(
1512 self,
1513 model,
1514 fields,
1515 name,
1516 condition=None,
1517 deferrable=None,
1518 include=None,
1519 opclasses=None,
1520 expressions=None,
1521 ):
1522 if (
1523 deferrable
1524 and not self.connection.features.supports_deferrable_unique_constraints
1525 ):
1526 return None
1527 if condition or include or opclasses or expressions:
1528 # Databases support conditional, covering, and functional unique
1529 # constraints via a unique index.
1530 sql = self._create_unique_sql(
1531 model,
1532 fields,
1533 name=name,
1534 condition=condition,
1535 include=include,
1536 opclasses=opclasses,
1537 expressions=expressions,
1538 )
1539 if sql:
1540 self.deferred_sql.append(sql)
1541 return None
1542 constraint = self.sql_unique_constraint % {
1543 "columns": ", ".join([self.quote_name(field.column) for field in fields]),
1544 "deferrable": self._deferrable_constraint_sql(deferrable),
1545 }
1546 return self.sql_constraint % {
1547 "name": self.quote_name(name),
1548 "constraint": constraint,
1549 }
1550
1551 def _create_unique_sql(
1552 self,
1553 model,
1554 fields,
1555 name=None,
1556 condition=None,
1557 deferrable=None,
1558 include=None,
1559 opclasses=None,
1560 expressions=None,
1561 ):
1562 if (
1563 (
1564 deferrable
1565 and not self.connection.features.supports_deferrable_unique_constraints
1566 )
1567 or (condition and not self.connection.features.supports_partial_indexes)
1568 or (include and not self.connection.features.supports_covering_indexes)
1569 or (
1570 expressions and not self.connection.features.supports_expression_indexes
1571 )
1572 ):
1573 return None
1574
1575 compiler = Query(model, alias_cols=False).get_compiler(
1576 connection=self.connection
1577 )
1578 table = model._meta.db_table
1579 columns = [field.column for field in fields]
1580 if name is None:
1581 name = self._unique_constraint_name(table, columns, quote=True)
1582 else:
1583 name = self.quote_name(name)
1584 if condition or include or opclasses or expressions:
1585 sql = self.sql_create_unique_index
1586 else:
1587 sql = self.sql_create_unique
1588 if columns:
1589 columns = self._index_columns(
1590 table, columns, col_suffixes=(), opclasses=opclasses
1591 )
1592 else:
1593 columns = Expressions(table, expressions, compiler, self.quote_value)
1594 return Statement(
1595 sql,
1596 table=Table(table, self.quote_name),
1597 name=name,
1598 columns=columns,
1599 condition=self._index_condition_sql(condition),
1600 deferrable=self._deferrable_constraint_sql(deferrable),
1601 include=self._index_include_sql(model, include),
1602 )
1603
1604 def _unique_constraint_name(self, table, columns, quote=True):
1605 if quote:
1606
1607 def create_unique_name(*args, **kwargs):
1608 return self.quote_name(self._create_index_name(*args, **kwargs))
1609
1610 else:
1611 create_unique_name = self._create_index_name
1612
1613 return IndexName(table, columns, "_uniq", create_unique_name)
1614
1615 def _delete_unique_sql(
1616 self,
1617 model,
1618 name,
1619 condition=None,
1620 deferrable=None,
1621 include=None,
1622 opclasses=None,
1623 expressions=None,
1624 ):
1625 if (
1626 (
1627 deferrable
1628 and not self.connection.features.supports_deferrable_unique_constraints
1629 )
1630 or (condition and not self.connection.features.supports_partial_indexes)
1631 or (include and not self.connection.features.supports_covering_indexes)
1632 or (
1633 expressions and not self.connection.features.supports_expression_indexes
1634 )
1635 ):
1636 return None
1637 if condition or include or opclasses or expressions:
1638 sql = self.sql_delete_index
1639 else:
1640 sql = self.sql_delete_unique
1641 return self._delete_constraint_sql(sql, model, name)
1642
1643 def _check_sql(self, name, check):
1644 return self.sql_constraint % {
1645 "name": self.quote_name(name),
1646 "constraint": self.sql_check_constraint % {"check": check},
1647 }
1648
1649 def _create_check_sql(self, model, name, check):
1650 if not self.connection.features.supports_table_check_constraints:
1651 return None
1652 return Statement(
1653 self.sql_create_check,
1654 table=Table(model._meta.db_table, self.quote_name),
1655 name=self.quote_name(name),
1656 check=check,
1657 )
1658
1659 def _delete_check_sql(self, model, name):
1660 if not self.connection.features.supports_table_check_constraints:
1661 return None
1662 return self._delete_constraint_sql(self.sql_delete_check, model, name)
1663
1664 def _delete_constraint_sql(self, template, model, name):
1665 return Statement(
1666 template,
1667 table=Table(model._meta.db_table, self.quote_name),
1668 name=self.quote_name(name),
1669 )
1670
1671 def _constraint_names(
1672 self,
1673 model,
1674 column_names=None,
1675 unique=None,
1676 primary_key=None,
1677 index=None,
1678 foreign_key=None,
1679 check=None,
1680 type_=None,
1681 exclude=None,
1682 ):
1683 """Return all constraint names matching the columns and conditions."""
1684 if column_names is not None:
1685 column_names = [
1686 self.connection.introspection.identifier_converter(
1687 truncate_name(name, self.connection.ops.max_name_length())
1688 )
1689 if self.connection.features.truncates_names
1690 else self.connection.introspection.identifier_converter(name)
1691 for name in column_names
1692 ]
1693 with self.connection.cursor() as cursor:
1694 constraints = self.connection.introspection.get_constraints(
1695 cursor, model._meta.db_table
1696 )
1697 result = []
1698 for name, infodict in constraints.items():
1699 if column_names is None or column_names == infodict["columns"]:
1700 if unique is not None and infodict["unique"] != unique:
1701 continue
1702 if primary_key is not None and infodict["primary_key"] != primary_key:
1703 continue
1704 if index is not None and infodict["index"] != index:
1705 continue
1706 if check is not None and infodict["check"] != check:
1707 continue
1708 if foreign_key is not None and not infodict["foreign_key"]:
1709 continue
1710 if type_ is not None and infodict["type"] != type_:
1711 continue
1712 if not exclude or name not in exclude:
1713 result.append(name)
1714 return result
1715
1716 def _delete_primary_key(self, model, strict=False):
1717 constraint_names = self._constraint_names(model, primary_key=True)
1718 if strict and len(constraint_names) != 1:
1719 raise ValueError(
1720 "Found wrong number ({}) of PK constraints for {}".format(
1721 len(constraint_names),
1722 model._meta.db_table,
1723 )
1724 )
1725 for constraint_name in constraint_names:
1726 self.execute(self._delete_primary_key_sql(model, constraint_name))
1727
1728 def _create_primary_key_sql(self, model, field):
1729 return Statement(
1730 self.sql_create_pk,
1731 table=Table(model._meta.db_table, self.quote_name),
1732 name=self.quote_name(
1733 self._create_index_name(
1734 model._meta.db_table, [field.column], suffix="_pk"
1735 )
1736 ),
1737 columns=Columns(model._meta.db_table, [field.column], self.quote_name),
1738 )
1739
1740 def _delete_primary_key_sql(self, model, name):
1741 return self._delete_constraint_sql(self.sql_delete_pk, model, name)
1742
1743 def _collate_sql(self, collation, old_collation=None, table_name=None):
1744 return "COLLATE " + self.quote_name(collation) if collation else ""
1745
1746 def remove_procedure(self, procedure_name, param_types=()):
1747 sql = self.sql_delete_procedure % {
1748 "procedure": self.quote_name(procedure_name),
1749 "param_types": ",".join(param_types),
1750 }
1751 self.execute(sql)