1from __future__ import annotations
2
3from functools import cached_property
4from typing import TYPE_CHECKING, Any
5
6from plain import models
7from plain.models.base import ModelBase
8from plain.models.migrations.operations.base import Operation
9from plain.models.migrations.state import ModelState
10from plain.models.migrations.utils import field_references, resolve_relation
11
12from .fields import AddField, AlterField, FieldOperation, RemoveField, RenameField
13
14if TYPE_CHECKING:
15 from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
16 from plain.models.fields import Field
17 from plain.models.migrations.state import ProjectState
18
19
20def _check_for_duplicates(arg_name: str, objs: Any) -> None:
21 used_vals = set()
22 for val in objs:
23 if val in used_vals:
24 raise ValueError(
25 f"Found duplicate value {val} in CreateModel {arg_name} argument."
26 )
27 used_vals.add(val)
28
29
30class ModelOperation(Operation):
31 def __init__(self, name: str) -> None:
32 self.name = name
33
34 @cached_property
35 def name_lower(self) -> str:
36 return self.name.lower()
37
38 def references_model(self, name: str, package_label: str) -> bool:
39 return name.lower() == self.name_lower
40
41 def reduce(
42 self, operation: Operation, package_label: str
43 ) -> bool | list[Operation]:
44 return super().reduce(operation, package_label) or self.can_reduce_through( # type: ignore[misc]
45 operation, package_label
46 )
47
48 def can_reduce_through(self, operation: Operation, package_label: str) -> bool:
49 return not operation.references_model(self.name, package_label)
50
51
52class CreateModel(ModelOperation):
53 """Create a model's table."""
54
55 serialization_expand_args = ["fields", "options"]
56
57 def __init__(
58 self,
59 name: str,
60 fields: list[tuple[str, Field]],
61 options: dict[str, Any] | None = None,
62 bases: tuple[Any, ...] | None = None,
63 ) -> None:
64 self.fields = fields
65 self.options = options or {}
66 self.bases = bases or (models.Model,)
67 super().__init__(name)
68 # Sanity-check that there are no duplicated field names or bases
69 _check_for_duplicates("fields", (name for name, _ in self.fields))
70 _check_for_duplicates(
71 "bases",
72 (
73 base.model_options.label_lower
74 if not isinstance(base, str)
75 and base is not models.Model
76 and hasattr(base, "_model_meta")
77 else base.lower()
78 if isinstance(base, str)
79 else base
80 for base in self.bases
81 ),
82 )
83
84 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
85 kwargs: dict[str, Any] = {
86 "name": self.name,
87 "fields": self.fields,
88 }
89 if self.options:
90 kwargs["options"] = self.options
91 if self.bases and self.bases != (models.Model,):
92 kwargs["bases"] = self.bases
93 return (self.__class__.__qualname__, [], kwargs)
94
95 def state_forwards(self, package_label: str, state: ProjectState) -> None:
96 state.add_model(
97 ModelState(
98 package_label,
99 self.name,
100 list(self.fields),
101 dict(self.options),
102 tuple(self.bases),
103 )
104 )
105
106 def database_forwards(
107 self,
108 package_label: str,
109 schema_editor: BaseDatabaseSchemaEditor,
110 from_state: ProjectState,
111 to_state: ProjectState,
112 ) -> None:
113 model = to_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
114 if self.allow_migrate_model(schema_editor.connection, model):
115 schema_editor.create_model(model)
116
117 def describe(self) -> str:
118 return f"Create model {self.name}"
119
120 @property
121 def migration_name_fragment(self) -> str:
122 return self.name_lower
123
124 def references_model(self, name: str, package_label: str) -> bool:
125 name_lower = name.lower()
126 if name_lower == self.name_lower:
127 return True
128
129 # Check we didn't inherit from the model
130 reference_model_tuple = (package_label, name_lower)
131 for base in self.bases:
132 if (
133 base is not models.Model
134 and isinstance(base, ModelBase | str)
135 and resolve_relation(base, package_label) == reference_model_tuple
136 ):
137 return True
138
139 # Check we have no FKs/M2Ms with it
140 for _name, field in self.fields:
141 if field_references(
142 (package_label, self.name_lower), field, reference_model_tuple
143 ):
144 return True
145 return False
146
147 def reduce(
148 self, operation: Operation, package_label: str
149 ) -> bool | list[Operation]:
150 if (
151 isinstance(operation, DeleteModel)
152 and self.name_lower == operation.name_lower
153 ):
154 return []
155 elif (
156 isinstance(operation, RenameModel)
157 and self.name_lower == operation.old_name_lower
158 ):
159 return [
160 CreateModel(
161 operation.new_name,
162 fields=self.fields,
163 options=self.options,
164 bases=self.bases,
165 ),
166 ]
167 elif (
168 isinstance(operation, AlterModelOptions)
169 and self.name_lower == operation.name_lower
170 ):
171 options = {**self.options, **operation.options}
172 for key in operation.ALTER_OPTION_KEYS:
173 if key not in operation.options:
174 options.pop(key, None)
175 return [
176 CreateModel(
177 self.name,
178 fields=self.fields,
179 options=options,
180 bases=self.bases,
181 ),
182 ]
183 elif (
184 isinstance(operation, FieldOperation)
185 and self.name_lower == operation.model_name_lower
186 ):
187 if isinstance(operation, AddField):
188 return [
189 CreateModel(
190 self.name,
191 fields=self.fields + [(operation.name, operation.field)],
192 options=self.options,
193 bases=self.bases,
194 ),
195 ]
196 elif isinstance(operation, AlterField):
197 return [
198 CreateModel(
199 self.name,
200 fields=[
201 (n, operation.field if n == operation.name else v)
202 for n, v in self.fields
203 ],
204 options=self.options,
205 bases=self.bases,
206 ),
207 ]
208 elif isinstance(operation, RemoveField):
209 options = self.options.copy()
210
211 return [
212 CreateModel(
213 self.name,
214 fields=[
215 (n, v)
216 for n, v in self.fields
217 if n.lower() != operation.name_lower
218 ],
219 options=options,
220 bases=self.bases,
221 ),
222 ]
223 elif isinstance(operation, RenameField):
224 options = self.options.copy()
225
226 return [
227 CreateModel(
228 self.name,
229 fields=[
230 (operation.new_name if n == operation.old_name else n, v)
231 for n, v in self.fields
232 ],
233 options=options,
234 bases=self.bases,
235 ),
236 ]
237 return super().reduce(operation, package_label)
238
239
240class DeleteModel(ModelOperation):
241 """Drop a model's table."""
242
243 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
244 kwargs: dict[str, Any] = {
245 "name": self.name,
246 }
247 return (self.__class__.__qualname__, [], kwargs)
248
249 def state_forwards(self, package_label: str, state: ProjectState) -> None:
250 state.remove_model(package_label, self.name_lower)
251
252 def database_forwards(
253 self,
254 package_label: str,
255 schema_editor: BaseDatabaseSchemaEditor,
256 from_state: ProjectState,
257 to_state: ProjectState,
258 ) -> None:
259 model = from_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
260 if self.allow_migrate_model(schema_editor.connection, model):
261 schema_editor.delete_model(model)
262
263 def references_model(self, name: str, package_label: str) -> bool:
264 # The deleted model could be referencing the specified model through
265 # related fields.
266 return True
267
268 def describe(self) -> str:
269 return f"Delete model {self.name}"
270
271 @property
272 def migration_name_fragment(self) -> str:
273 return f"delete_{self.name_lower}"
274
275
276class RenameModel(ModelOperation):
277 """Rename a model."""
278
279 def __init__(self, old_name: str, new_name: str) -> None:
280 self.old_name = old_name
281 self.new_name = new_name
282 super().__init__(old_name)
283
284 @cached_property
285 def old_name_lower(self) -> str:
286 return self.old_name.lower()
287
288 @cached_property
289 def new_name_lower(self) -> str:
290 return self.new_name.lower()
291
292 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
293 kwargs: dict[str, Any] = {
294 "old_name": self.old_name,
295 "new_name": self.new_name,
296 }
297 return (self.__class__.__qualname__, [], kwargs)
298
299 def state_forwards(self, package_label: str, state: ProjectState) -> None:
300 state.rename_model(package_label, self.old_name, self.new_name)
301
302 def database_forwards(
303 self,
304 package_label: str,
305 schema_editor: BaseDatabaseSchemaEditor,
306 from_state: ProjectState,
307 to_state: ProjectState,
308 ) -> None:
309 new_model = to_state.models_registry.get_model(package_label, self.new_name) # type: ignore[attr-defined]
310 if self.allow_migrate_model(schema_editor.connection, new_model):
311 old_model = from_state.models_registry.get_model( # type: ignore[attr-defined]
312 package_label, self.old_name
313 )
314 # Move the main table
315 schema_editor.alter_db_table(
316 new_model,
317 old_model.model_options.db_table,
318 new_model.model_options.db_table,
319 )
320 # Alter the fields pointing to us
321 for related_object in old_model._model_meta.related_objects:
322 if related_object.related_model == old_model: # type: ignore[attr-defined]
323 model = new_model
324 related_key = (package_label, self.new_name_lower)
325 else:
326 model = related_object.related_model # type: ignore[attr-defined]
327 related_key = (
328 related_object.related_model.model_options.package_label,
329 related_object.related_model.model_options.model_name,
330 )
331 to_field = to_state.models_registry.get_model(
332 *related_key
333 )._model_meta.get_field(related_object.field.name)
334 schema_editor.alter_field(
335 model,
336 related_object.field,
337 to_field,
338 )
339
340 def references_model(self, name: str, package_label: str) -> bool:
341 return (
342 name.lower() == self.old_name_lower or name.lower() == self.new_name_lower
343 )
344
345 def describe(self) -> str:
346 return f"Rename model {self.old_name} to {self.new_name}"
347
348 @property
349 def migration_name_fragment(self) -> str:
350 return f"rename_{self.old_name_lower}_{self.new_name_lower}"
351
352 def reduce(
353 self, operation: Operation, package_label: str
354 ) -> bool | list[Operation]:
355 if (
356 isinstance(operation, RenameModel)
357 and self.new_name_lower == operation.old_name_lower
358 ):
359 return [
360 RenameModel(
361 self.old_name,
362 operation.new_name,
363 ),
364 ]
365 # Skip `ModelOperation.reduce` as we want to run `references_model`
366 # against self.new_name.
367 return super(ModelOperation, self).reduce( # type: ignore[misc]
368 operation, package_label
369 ) or not operation.references_model(self.new_name, package_label)
370
371
372class ModelOptionOperation(ModelOperation):
373 def reduce(
374 self, operation: Operation, package_label: str
375 ) -> bool | list[Operation]:
376 if (
377 isinstance(operation, self.__class__ | DeleteModel)
378 and self.name_lower == operation.name_lower
379 ):
380 return [operation]
381 return super().reduce(operation, package_label)
382
383
384class AlterModelTable(ModelOptionOperation):
385 """Rename a model's table."""
386
387 def __init__(self, name: str, table: str | None) -> None:
388 self.table = table
389 super().__init__(name)
390
391 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
392 kwargs: dict[str, Any] = {
393 "name": self.name,
394 "table": self.table,
395 }
396 return (self.__class__.__qualname__, [], kwargs)
397
398 def state_forwards(self, package_label: str, state: ProjectState) -> None:
399 state.alter_model_options(
400 package_label, self.name_lower, {"db_table": self.table}
401 )
402
403 def database_forwards(
404 self,
405 package_label: str,
406 schema_editor: BaseDatabaseSchemaEditor,
407 from_state: ProjectState,
408 to_state: ProjectState,
409 ) -> None:
410 new_model = to_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
411 if self.allow_migrate_model(schema_editor.connection, new_model):
412 old_model = from_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
413 schema_editor.alter_db_table(
414 new_model,
415 old_model.model_options.db_table,
416 new_model.model_options.db_table,
417 )
418
419 def describe(self) -> str:
420 return "Rename table for {} to {}".format(
421 self.name,
422 self.table if self.table is not None else "(default)",
423 )
424
425 @property
426 def migration_name_fragment(self) -> str:
427 return f"alter_{self.name_lower}_table"
428
429
430class AlterModelTableComment(ModelOptionOperation):
431 def __init__(self, name: str, table_comment: str | None) -> None:
432 self.table_comment = table_comment
433 super().__init__(name)
434
435 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
436 kwargs: dict[str, Any] = {
437 "name": self.name,
438 "table_comment": self.table_comment,
439 }
440 return (self.__class__.__qualname__, [], kwargs)
441
442 def state_forwards(self, package_label: str, state: ProjectState) -> None:
443 state.alter_model_options(
444 package_label, self.name_lower, {"db_table_comment": self.table_comment}
445 )
446
447 def database_forwards(
448 self,
449 package_label: str,
450 schema_editor: BaseDatabaseSchemaEditor,
451 from_state: ProjectState,
452 to_state: ProjectState,
453 ) -> None:
454 new_model = to_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
455 if self.allow_migrate_model(schema_editor.connection, new_model):
456 old_model = from_state.models_registry.get_model(package_label, self.name) # type: ignore[attr-defined]
457 schema_editor.alter_db_table_comment(
458 new_model,
459 old_model.model_options.db_table_comment,
460 new_model.model_options.db_table_comment,
461 )
462
463 def describe(self) -> str:
464 return f"Alter {self.name} table comment"
465
466 @property
467 def migration_name_fragment(self) -> str:
468 return f"alter_{self.name_lower}_table_comment"
469
470
471class AlterModelOptions(ModelOptionOperation):
472 """
473 Set new model options that don't directly affect the database schema
474 (like ordering). Python code in migrations
475 may still need them.
476 """
477
478 # Model options we want to compare and preserve in an AlterModelOptions op
479 ALTER_OPTION_KEYS = [
480 "ordering",
481 ]
482
483 def __init__(self, name: str, options: dict[str, Any]) -> None:
484 self.options = options
485 super().__init__(name)
486
487 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
488 kwargs: dict[str, Any] = {
489 "name": self.name,
490 "options": self.options,
491 }
492 return (self.__class__.__qualname__, [], kwargs)
493
494 def state_forwards(self, package_label: str, state: ProjectState) -> None:
495 state.alter_model_options(
496 package_label,
497 self.name_lower,
498 self.options,
499 self.ALTER_OPTION_KEYS,
500 )
501
502 def database_forwards(
503 self,
504 package_label: str,
505 schema_editor: BaseDatabaseSchemaEditor,
506 from_state: ProjectState,
507 to_state: ProjectState,
508 ) -> None:
509 pass
510
511 def describe(self) -> str:
512 return f"Change Meta options on {self.name}"
513
514 @property
515 def migration_name_fragment(self) -> str:
516 return f"alter_{self.name_lower}_options"
517
518
519class IndexOperation(Operation):
520 option_name = "indexes"
521
522 @cached_property
523 def model_name_lower(self) -> str:
524 return self.model_name.lower() # type: ignore[attr-defined]
525
526
527class AddIndex(IndexOperation):
528 """Add an index on a model."""
529
530 def __init__(self, model_name: str, index: Any) -> None:
531 self.model_name = model_name
532 if not index.name: # type: ignore[attr-defined]
533 raise ValueError(
534 "Indexes passed to AddIndex operations require a name "
535 f"argument. {index!r} doesn't have one."
536 )
537 self.index = index
538
539 def state_forwards(self, package_label: str, state: ProjectState) -> None:
540 state.add_index(package_label, self.model_name_lower, self.index)
541
542 def database_forwards(
543 self,
544 package_label: str,
545 schema_editor: BaseDatabaseSchemaEditor,
546 from_state: ProjectState,
547 to_state: ProjectState,
548 ) -> None:
549 model = to_state.models_registry.get_model(package_label, self.model_name) # type: ignore[attr-defined]
550 if self.allow_migrate_model(schema_editor.connection, model):
551 schema_editor.add_index(model, self.index)
552
553 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
554 kwargs: dict[str, Any] = {
555 "model_name": self.model_name,
556 "index": self.index,
557 }
558 return (
559 self.__class__.__qualname__,
560 [],
561 kwargs,
562 )
563
564 def describe(self) -> str:
565 if self.index.expressions: # type: ignore[attr-defined]
566 return "Create index {} on {} on model {}".format(
567 self.index.name, # type: ignore[attr-defined]
568 ", ".join([str(expression) for expression in self.index.expressions]), # type: ignore[attr-defined]
569 self.model_name,
570 )
571 return "Create index {} on field(s) {} of model {}".format(
572 self.index.name, # type: ignore[attr-defined]
573 ", ".join(self.index.fields), # type: ignore[attr-defined]
574 self.model_name,
575 )
576
577 @property
578 def migration_name_fragment(self) -> str:
579 return f"{self.model_name_lower}_{self.index.name.lower()}" # type: ignore[attr-defined]
580
581
582class RemoveIndex(IndexOperation):
583 """Remove an index from a model."""
584
585 def __init__(self, model_name: str, name: str) -> None:
586 self.model_name = model_name
587 self.name = name
588
589 def state_forwards(self, package_label: str, state: ProjectState) -> None:
590 state.remove_index(package_label, self.model_name_lower, self.name)
591
592 def database_forwards(
593 self,
594 package_label: str,
595 schema_editor: BaseDatabaseSchemaEditor,
596 from_state: ProjectState,
597 to_state: ProjectState,
598 ) -> None:
599 model = from_state.models_registry.get_model(package_label, self.model_name) # type: ignore[attr-defined]
600 if self.allow_migrate_model(schema_editor.connection, model):
601 from_model_state = from_state.models[package_label, self.model_name_lower]
602 index = from_model_state.get_index_by_name(self.name)
603 schema_editor.remove_index(model, index)
604
605 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
606 kwargs: dict[str, Any] = {
607 "model_name": self.model_name,
608 "name": self.name,
609 }
610 return (
611 self.__class__.__qualname__,
612 [],
613 kwargs,
614 )
615
616 def describe(self) -> str:
617 return f"Remove index {self.name} from {self.model_name}"
618
619 @property
620 def migration_name_fragment(self) -> str:
621 return f"remove_{self.model_name_lower}_{self.name.lower()}"
622
623
624class RenameIndex(IndexOperation):
625 """Rename an index."""
626
627 def __init__(
628 self,
629 model_name: str,
630 new_name: str,
631 old_name: str | None = None,
632 old_fields: list[str] | tuple[str, ...] | None = None,
633 ) -> None:
634 if not old_name and not old_fields:
635 raise ValueError(
636 "RenameIndex requires one of old_name and old_fields arguments to be "
637 "set."
638 )
639 if old_name and old_fields:
640 raise ValueError(
641 "RenameIndex.old_name and old_fields are mutually exclusive."
642 )
643 self.model_name = model_name
644 self.new_name = new_name
645 self.old_name = old_name
646 self.old_fields = old_fields
647
648 @cached_property
649 def old_name_lower(self) -> str:
650 return self.old_name.lower() # type: ignore[union-attr]
651
652 @cached_property
653 def new_name_lower(self) -> str:
654 return self.new_name.lower()
655
656 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
657 kwargs: dict[str, Any] = {
658 "model_name": self.model_name,
659 "new_name": self.new_name,
660 }
661 if self.old_name:
662 kwargs["old_name"] = self.old_name
663 if self.old_fields:
664 kwargs["old_fields"] = self.old_fields
665 return (self.__class__.__qualname__, [], kwargs)
666
667 def state_forwards(self, package_label: str, state: ProjectState) -> None:
668 if self.old_fields:
669 state.add_index(
670 package_label,
671 self.model_name_lower,
672 models.Index(fields=self.old_fields, name=self.new_name),
673 )
674 else:
675 state.rename_index(
676 package_label,
677 self.model_name_lower,
678 self.old_name,
679 self.new_name, # type: ignore[arg-type]
680 )
681
682 def database_forwards(
683 self,
684 package_label: str,
685 schema_editor: BaseDatabaseSchemaEditor,
686 from_state: ProjectState,
687 to_state: ProjectState,
688 ) -> None:
689 model = to_state.models_registry.get_model(package_label, self.model_name) # type: ignore[attr-defined]
690 if not self.allow_migrate_model(schema_editor.connection, model):
691 return None
692
693 if self.old_fields:
694 from_model = from_state.models_registry.get_model( # type: ignore[attr-defined]
695 package_label, self.model_name
696 )
697 columns = [
698 from_model._model_meta.get_field(field).column
699 for field in self.old_fields
700 ]
701 matching_index_name = schema_editor._constraint_names(
702 from_model, column_names=columns, index=True
703 )
704 if len(matching_index_name) != 1:
705 raise ValueError(
706 "Found wrong number ({}) of indexes for {}({}).".format(
707 len(matching_index_name),
708 from_model.model_options.db_table,
709 ", ".join(columns),
710 )
711 )
712 old_index = models.Index(
713 fields=self.old_fields,
714 name=matching_index_name[0],
715 )
716 else:
717 from_model_state = from_state.models[package_label, self.model_name_lower]
718 old_index = from_model_state.get_index_by_name(self.old_name) # type: ignore[arg-type]
719 # Don't alter when the index name is not changed.
720 if old_index.name == self.new_name: # type: ignore[attr-defined]
721 return None
722
723 to_model_state = to_state.models[package_label, self.model_name_lower]
724 new_index = to_model_state.get_index_by_name(self.new_name)
725 schema_editor.rename_index(model, old_index, new_index)
726 return None
727
728 def describe(self) -> str:
729 if self.old_name:
730 return (
731 f"Rename index {self.old_name} on {self.model_name} to {self.new_name}"
732 )
733 return (
734 f"Rename unnamed index for {self.old_fields} on {self.model_name} to "
735 f"{self.new_name}"
736 )
737
738 @property
739 def migration_name_fragment(self) -> str:
740 if self.old_name:
741 return f"rename_{self.old_name_lower}_{self.new_name_lower}"
742 return "rename_{}_{}_{}".format(
743 self.model_name_lower,
744 "_".join(self.old_fields), # type: ignore[arg-type]
745 self.new_name_lower,
746 )
747
748 def reduce(
749 self, operation: Operation, package_label: str
750 ) -> bool | list[Operation]:
751 if (
752 isinstance(operation, RenameIndex)
753 and self.model_name_lower == operation.model_name_lower
754 and operation.old_name
755 and self.new_name_lower == operation.old_name_lower
756 ):
757 return [
758 RenameIndex(
759 self.model_name,
760 new_name=operation.new_name,
761 old_name=self.old_name,
762 old_fields=self.old_fields,
763 )
764 ]
765 return super().reduce(operation, package_label)
766
767
768class AddConstraint(IndexOperation):
769 option_name = "constraints"
770
771 def __init__(self, model_name: str, constraint: Any) -> None:
772 self.model_name = model_name
773 self.constraint = constraint
774
775 def state_forwards(self, package_label: str, state: ProjectState) -> None:
776 state.add_constraint(package_label, self.model_name_lower, self.constraint)
777
778 def database_forwards(
779 self,
780 package_label: str,
781 schema_editor: BaseDatabaseSchemaEditor,
782 from_state: ProjectState,
783 to_state: ProjectState,
784 ) -> None:
785 model = to_state.models_registry.get_model(package_label, self.model_name) # type: ignore[attr-defined]
786 if self.allow_migrate_model(schema_editor.connection, model):
787 schema_editor.add_constraint(model, self.constraint)
788
789 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
790 return (
791 self.__class__.__name__,
792 [],
793 {
794 "model_name": self.model_name,
795 "constraint": self.constraint,
796 },
797 )
798
799 def describe(self) -> str:
800 return f"Create constraint {self.constraint.name} on model {self.model_name}" # type: ignore[attr-defined]
801
802 @property
803 def migration_name_fragment(self) -> str:
804 return f"{self.model_name_lower}_{self.constraint.name.lower()}" # type: ignore[attr-defined]
805
806
807class RemoveConstraint(IndexOperation):
808 option_name = "constraints"
809
810 def __init__(self, model_name: str, name: str) -> None:
811 self.model_name = model_name
812 self.name = name
813
814 def state_forwards(self, package_label: str, state: ProjectState) -> None:
815 state.remove_constraint(package_label, self.model_name_lower, self.name)
816
817 def database_forwards(
818 self,
819 package_label: str,
820 schema_editor: BaseDatabaseSchemaEditor,
821 from_state: ProjectState,
822 to_state: ProjectState,
823 ) -> None:
824 model = to_state.models_registry.get_model(package_label, self.model_name) # type: ignore[attr-defined]
825 if self.allow_migrate_model(schema_editor.connection, model):
826 from_model_state = from_state.models[package_label, self.model_name_lower]
827 constraint = from_model_state.get_constraint_by_name(self.name)
828 schema_editor.remove_constraint(model, constraint)
829
830 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
831 return (
832 self.__class__.__name__,
833 [],
834 {
835 "model_name": self.model_name,
836 "name": self.name,
837 },
838 )
839
840 def describe(self) -> str:
841 return f"Remove constraint {self.name} from model {self.model_name}"
842
843 @property
844 def migration_name_fragment(self) -> str:
845 return f"remove_{self.model_name_lower}_{self.name.lower()}"