1from __future__ import annotations
2
3from functools import cached_property
4from typing import TYPE_CHECKING, Any
5
6from plain import models
7from plain.models.base import ModelBase
8from plain.models.migrations.operations.base import Operation
9from plain.models.migrations.state import ModelState
10from plain.models.migrations.utils import field_references, resolve_relation
11
12from .fields import AddField, AlterField, FieldOperation, RemoveField, RenameField
13
14if TYPE_CHECKING:
15 from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
16 from plain.models.fields import Field
17 from plain.models.migrations.state import ProjectState
18
19
20def _check_for_duplicates(arg_name: str, objs: Any) -> None:
21 used_vals = set()
22 for val in objs:
23 if val in used_vals:
24 raise ValueError(
25 f"Found duplicate value {val} in CreateModel {arg_name} argument."
26 )
27 used_vals.add(val)
28
29
30class ModelOperation(Operation):
31 def __init__(self, name: str) -> None:
32 self.name = name
33
34 @cached_property
35 def name_lower(self) -> str:
36 return self.name.lower()
37
38 def references_model(self, name: str, package_label: str) -> bool:
39 return name.lower() == self.name_lower
40
41 def reduce(
42 self, operation: Operation, package_label: str
43 ) -> bool | list[Operation]:
44 return super().reduce(operation, package_label) or self.can_reduce_through(
45 operation, package_label
46 )
47
48 def can_reduce_through(self, operation: Operation, package_label: str) -> bool:
49 return not operation.references_model(self.name, package_label)
50
51
52class CreateModel(ModelOperation):
53 """Create a model's table."""
54
55 serialization_expand_args = ["fields", "options"]
56
57 def __init__(
58 self,
59 name: str,
60 fields: list[tuple[str, Field]],
61 options: dict[str, Any] | None = None,
62 bases: tuple[Any, ...] | None = None,
63 ) -> None:
64 self.fields = fields
65 self.options = options or {}
66 self.bases = bases or (models.Model,)
67 super().__init__(name)
68 # Sanity-check that there are no duplicated field names or bases
69 _check_for_duplicates("fields", (name for name, _ in self.fields))
70 _check_for_duplicates(
71 "bases",
72 (
73 base.model_options.label_lower
74 if not isinstance(base, str)
75 and base is not models.Model
76 and hasattr(base, "_model_meta")
77 else base.lower()
78 if isinstance(base, str)
79 else base
80 for base in self.bases
81 ),
82 )
83
84 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
85 kwargs: dict[str, Any] = {
86 "name": self.name,
87 "fields": self.fields,
88 }
89 if self.options:
90 kwargs["options"] = self.options
91 if self.bases and self.bases != (models.Model,):
92 kwargs["bases"] = self.bases
93 return (self.__class__.__qualname__, [], kwargs)
94
95 def state_forwards(self, package_label: str, state: ProjectState) -> None:
96 state.add_model(
97 ModelState(
98 package_label,
99 self.name,
100 list(self.fields),
101 dict(self.options),
102 tuple(self.bases),
103 )
104 )
105
106 def database_forwards(
107 self,
108 package_label: str,
109 schema_editor: BaseDatabaseSchemaEditor,
110 from_state: ProjectState,
111 to_state: ProjectState,
112 ) -> None:
113 model = to_state.models_registry.get_model(package_label, self.name)
114 if self.allow_migrate_model(schema_editor.connection, model):
115 schema_editor.create_model(model)
116
117 def describe(self) -> str:
118 return f"Create model {self.name}"
119
120 @property
121 def migration_name_fragment(self) -> str:
122 return self.name_lower
123
124 def references_model(self, name: str, package_label: str) -> bool:
125 name_lower = name.lower()
126 if name_lower == self.name_lower:
127 return True
128
129 # Check we didn't inherit from the model
130 reference_model_tuple = (package_label, name_lower)
131 for base in self.bases:
132 if (
133 base is not models.Model
134 and isinstance(base, ModelBase | str)
135 and resolve_relation(base, package_label) == reference_model_tuple
136 ):
137 return True
138
139 # Check we have no FKs/M2Ms with it
140 for _name, field in self.fields:
141 if field_references(
142 (package_label, self.name_lower), field, reference_model_tuple
143 ):
144 return True
145 return False
146
147 def reduce(
148 self, operation: Operation, package_label: str
149 ) -> bool | list[Operation]:
150 if (
151 isinstance(operation, DeleteModel)
152 and self.name_lower == operation.name_lower
153 ):
154 return []
155 elif (
156 isinstance(operation, RenameModel)
157 and self.name_lower == operation.old_name_lower
158 ):
159 return [
160 CreateModel(
161 operation.new_name,
162 fields=self.fields,
163 options=self.options,
164 bases=self.bases,
165 ),
166 ]
167 elif (
168 isinstance(operation, AlterModelOptions)
169 and self.name_lower == operation.name_lower
170 ):
171 options = {**self.options, **operation.options}
172 for key in operation.ALTER_OPTION_KEYS:
173 if key not in operation.options:
174 options.pop(key, None)
175 return [
176 CreateModel(
177 self.name,
178 fields=self.fields,
179 options=options,
180 bases=self.bases,
181 ),
182 ]
183 elif (
184 isinstance(operation, FieldOperation)
185 and self.name_lower == operation.model_name_lower
186 ):
187 if isinstance(operation, AddField):
188 assert operation.field is not None
189 return [
190 CreateModel(
191 self.name,
192 fields=self.fields + [(operation.name, operation.field)],
193 options=self.options,
194 bases=self.bases,
195 ),
196 ]
197 elif isinstance(operation, AlterField):
198 assert operation.field is not None
199 return [
200 CreateModel(
201 self.name,
202 fields=[
203 (n, operation.field if n == operation.name else v)
204 for n, v in self.fields
205 ],
206 options=self.options,
207 bases=self.bases,
208 ),
209 ]
210 elif isinstance(operation, RemoveField):
211 options = self.options.copy()
212
213 return [
214 CreateModel(
215 self.name,
216 fields=[
217 (n, v)
218 for n, v in self.fields
219 if n.lower() != operation.name_lower
220 ],
221 options=options,
222 bases=self.bases,
223 ),
224 ]
225 elif isinstance(operation, RenameField):
226 options = self.options.copy()
227
228 return [
229 CreateModel(
230 self.name,
231 fields=[
232 (operation.new_name if n == operation.old_name else n, v)
233 for n, v in self.fields
234 ],
235 options=options,
236 bases=self.bases,
237 ),
238 ]
239 return super().reduce(operation, package_label)
240
241
242class DeleteModel(ModelOperation):
243 """Drop a model's table."""
244
245 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
246 kwargs: dict[str, Any] = {
247 "name": self.name,
248 }
249 return (self.__class__.__qualname__, [], kwargs)
250
251 def state_forwards(self, package_label: str, state: ProjectState) -> None:
252 state.remove_model(package_label, self.name_lower)
253
254 def database_forwards(
255 self,
256 package_label: str,
257 schema_editor: BaseDatabaseSchemaEditor,
258 from_state: ProjectState,
259 to_state: ProjectState,
260 ) -> None:
261 model = from_state.models_registry.get_model(package_label, self.name)
262 if self.allow_migrate_model(schema_editor.connection, model):
263 schema_editor.delete_model(model)
264
265 def references_model(self, name: str, package_label: str) -> bool:
266 # The deleted model could be referencing the specified model through
267 # related fields.
268 return True
269
270 def describe(self) -> str:
271 return f"Delete model {self.name}"
272
273 @property
274 def migration_name_fragment(self) -> str:
275 return f"delete_{self.name_lower}"
276
277
278class RenameModel(ModelOperation):
279 """Rename a model."""
280
281 def __init__(self, old_name: str, new_name: str) -> None:
282 self.old_name = old_name
283 self.new_name = new_name
284 super().__init__(old_name)
285
286 @cached_property
287 def old_name_lower(self) -> str:
288 return self.old_name.lower()
289
290 @cached_property
291 def new_name_lower(self) -> str:
292 return self.new_name.lower()
293
294 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
295 kwargs: dict[str, Any] = {
296 "old_name": self.old_name,
297 "new_name": self.new_name,
298 }
299 return (self.__class__.__qualname__, [], kwargs)
300
301 def state_forwards(self, package_label: str, state: ProjectState) -> None:
302 state.rename_model(package_label, self.old_name, self.new_name)
303
304 def database_forwards(
305 self,
306 package_label: str,
307 schema_editor: BaseDatabaseSchemaEditor,
308 from_state: ProjectState,
309 to_state: ProjectState,
310 ) -> None:
311 new_model = to_state.models_registry.get_model(package_label, self.new_name)
312 if self.allow_migrate_model(schema_editor.connection, new_model):
313 old_model = from_state.models_registry.get_model(
314 package_label, self.old_name
315 )
316 # Move the main table
317 schema_editor.alter_db_table(
318 new_model,
319 old_model.model_options.db_table,
320 new_model.model_options.db_table,
321 )
322 # Alter the fields pointing to us
323 for related_object in old_model._model_meta.related_objects:
324 if related_object.related_model == old_model:
325 model = new_model
326 related_key = (package_label, self.new_name_lower)
327 else:
328 model = related_object.related_model
329 related_key = (
330 related_object.related_model.model_options.package_label,
331 related_object.related_model.model_options.model_name,
332 )
333 to_field = to_state.models_registry.get_model(
334 *related_key
335 )._model_meta.get_field(related_object.field.name)
336 schema_editor.alter_field(
337 model,
338 related_object.field,
339 to_field,
340 )
341
342 def references_model(self, name: str, package_label: str) -> bool:
343 return (
344 name.lower() == self.old_name_lower or name.lower() == self.new_name_lower
345 )
346
347 def describe(self) -> str:
348 return f"Rename model {self.old_name} to {self.new_name}"
349
350 @property
351 def migration_name_fragment(self) -> str:
352 return f"rename_{self.old_name_lower}_{self.new_name_lower}"
353
354 def reduce(
355 self, operation: Operation, package_label: str
356 ) -> bool | list[Operation]:
357 if (
358 isinstance(operation, RenameModel)
359 and self.new_name_lower == operation.old_name_lower
360 ):
361 return [
362 RenameModel(
363 self.old_name,
364 operation.new_name,
365 ),
366 ]
367 # Skip `ModelOperation.reduce` as we want to run `references_model`
368 # against self.new_name.
369 return super(ModelOperation, self).reduce(
370 operation, package_label
371 ) or not operation.references_model(self.new_name, package_label)
372
373
374class ModelOptionOperation(ModelOperation):
375 def reduce(
376 self, operation: Operation, package_label: str
377 ) -> bool | list[Operation]:
378 # Use tuple syntax because self.__class__ is not compatible with union syntax in isinstance
379 if isinstance(operation, (self.__class__, DeleteModel)) and ( # noqa: UP038
380 self.name_lower == operation.name_lower
381 ):
382 return [operation]
383 return super().reduce(operation, package_label)
384
385
386class AlterModelTable(ModelOptionOperation):
387 """Rename a model's table."""
388
389 def __init__(self, name: str, table: str | None) -> None:
390 self.table = table
391 super().__init__(name)
392
393 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
394 kwargs: dict[str, Any] = {
395 "name": self.name,
396 "table": self.table,
397 }
398 return (self.__class__.__qualname__, [], kwargs)
399
400 def state_forwards(self, package_label: str, state: ProjectState) -> None:
401 state.alter_model_options(
402 package_label, self.name_lower, {"db_table": self.table}
403 )
404
405 def database_forwards(
406 self,
407 package_label: str,
408 schema_editor: BaseDatabaseSchemaEditor,
409 from_state: ProjectState,
410 to_state: ProjectState,
411 ) -> None:
412 new_model = to_state.models_registry.get_model(package_label, self.name)
413 if self.allow_migrate_model(schema_editor.connection, new_model):
414 old_model = from_state.models_registry.get_model(package_label, self.name)
415 schema_editor.alter_db_table(
416 new_model,
417 old_model.model_options.db_table,
418 new_model.model_options.db_table,
419 )
420
421 def describe(self) -> str:
422 return "Rename table for {} to {}".format(
423 self.name,
424 self.table if self.table is not None else "(default)",
425 )
426
427 @property
428 def migration_name_fragment(self) -> str:
429 return f"alter_{self.name_lower}_table"
430
431
432class AlterModelTableComment(ModelOptionOperation):
433 def __init__(self, name: str, table_comment: str | None) -> None:
434 self.table_comment = table_comment
435 super().__init__(name)
436
437 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
438 kwargs: dict[str, Any] = {
439 "name": self.name,
440 "table_comment": self.table_comment,
441 }
442 return (self.__class__.__qualname__, [], kwargs)
443
444 def state_forwards(self, package_label: str, state: ProjectState) -> None:
445 state.alter_model_options(
446 package_label, self.name_lower, {"db_table_comment": self.table_comment}
447 )
448
449 def database_forwards(
450 self,
451 package_label: str,
452 schema_editor: BaseDatabaseSchemaEditor,
453 from_state: ProjectState,
454 to_state: ProjectState,
455 ) -> None:
456 new_model = to_state.models_registry.get_model(package_label, self.name)
457 if self.allow_migrate_model(schema_editor.connection, new_model):
458 old_model = from_state.models_registry.get_model(package_label, self.name)
459 schema_editor.alter_db_table_comment(
460 new_model,
461 old_model.model_options.db_table_comment,
462 new_model.model_options.db_table_comment,
463 )
464
465 def describe(self) -> str:
466 return f"Alter {self.name} table comment"
467
468 @property
469 def migration_name_fragment(self) -> str:
470 return f"alter_{self.name_lower}_table_comment"
471
472
473class AlterModelOptions(ModelOptionOperation):
474 """
475 Set new model options that don't directly affect the database schema
476 (like ordering). Python code in migrations
477 may still need them.
478 """
479
480 # Model options we want to compare and preserve in an AlterModelOptions op
481 ALTER_OPTION_KEYS = [
482 "ordering",
483 ]
484
485 def __init__(self, name: str, options: dict[str, Any]) -> None:
486 self.options = options
487 super().__init__(name)
488
489 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
490 kwargs: dict[str, Any] = {
491 "name": self.name,
492 "options": self.options,
493 }
494 return (self.__class__.__qualname__, [], kwargs)
495
496 def state_forwards(self, package_label: str, state: ProjectState) -> None:
497 state.alter_model_options(
498 package_label,
499 self.name_lower,
500 self.options,
501 self.ALTER_OPTION_KEYS,
502 )
503
504 def database_forwards(
505 self,
506 package_label: str,
507 schema_editor: BaseDatabaseSchemaEditor,
508 from_state: ProjectState,
509 to_state: ProjectState,
510 ) -> None:
511 pass
512
513 def describe(self) -> str:
514 return f"Change Meta options on {self.name}"
515
516 @property
517 def migration_name_fragment(self) -> str:
518 return f"alter_{self.name_lower}_options"
519
520
521class IndexOperation(Operation):
522 option_name = "indexes"
523 model_name: str # Set by subclasses
524
525 @cached_property
526 def model_name_lower(self) -> str:
527 return self.model_name.lower()
528
529
530class AddIndex(IndexOperation):
531 """Add an index on a model."""
532
533 def __init__(self, model_name: str, index: Any) -> None:
534 self.model_name = model_name
535 if not index.name:
536 raise ValueError(
537 "Indexes passed to AddIndex operations require a name "
538 f"argument. {index!r} doesn't have one."
539 )
540 self.index = index
541
542 def state_forwards(self, package_label: str, state: ProjectState) -> None:
543 state.add_index(package_label, self.model_name_lower, self.index)
544
545 def database_forwards(
546 self,
547 package_label: str,
548 schema_editor: BaseDatabaseSchemaEditor,
549 from_state: ProjectState,
550 to_state: ProjectState,
551 ) -> None:
552 model = to_state.models_registry.get_model(package_label, self.model_name)
553 if self.allow_migrate_model(schema_editor.connection, model):
554 schema_editor.add_index(model, self.index)
555
556 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
557 kwargs: dict[str, Any] = {
558 "model_name": self.model_name,
559 "index": self.index,
560 }
561 return (
562 self.__class__.__qualname__,
563 [],
564 kwargs,
565 )
566
567 def describe(self) -> str:
568 if self.index.expressions:
569 return "Create index {} on {} on model {}".format(
570 self.index.name,
571 ", ".join([str(expression) for expression in self.index.expressions]),
572 self.model_name,
573 )
574 return "Create index {} on field(s) {} of model {}".format(
575 self.index.name,
576 ", ".join(self.index.fields),
577 self.model_name,
578 )
579
580 @property
581 def migration_name_fragment(self) -> str:
582 return f"{self.model_name_lower}_{self.index.name.lower()}"
583
584
585class RemoveIndex(IndexOperation):
586 """Remove an index from a model."""
587
588 def __init__(self, model_name: str, name: str) -> None:
589 self.model_name = model_name
590 self.name = name
591
592 def state_forwards(self, package_label: str, state: ProjectState) -> None:
593 state.remove_index(package_label, self.model_name_lower, self.name)
594
595 def database_forwards(
596 self,
597 package_label: str,
598 schema_editor: BaseDatabaseSchemaEditor,
599 from_state: ProjectState,
600 to_state: ProjectState,
601 ) -> None:
602 model = from_state.models_registry.get_model(package_label, self.model_name)
603 if self.allow_migrate_model(schema_editor.connection, model):
604 from_model_state = from_state.models[package_label, self.model_name_lower]
605 index = from_model_state.get_index_by_name(self.name)
606 schema_editor.remove_index(model, index)
607
608 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
609 kwargs: dict[str, Any] = {
610 "model_name": self.model_name,
611 "name": self.name,
612 }
613 return (
614 self.__class__.__qualname__,
615 [],
616 kwargs,
617 )
618
619 def describe(self) -> str:
620 return f"Remove index {self.name} from {self.model_name}"
621
622 @property
623 def migration_name_fragment(self) -> str:
624 return f"remove_{self.model_name_lower}_{self.name.lower()}"
625
626
627class RenameIndex(IndexOperation):
628 """Rename an index."""
629
630 def __init__(
631 self,
632 model_name: str,
633 new_name: str,
634 old_name: str | None = None,
635 old_fields: list[str] | tuple[str, ...] | None = None,
636 ) -> None:
637 if not old_name and not old_fields:
638 raise ValueError(
639 "RenameIndex requires one of old_name and old_fields arguments to be "
640 "set."
641 )
642 if old_name and old_fields:
643 raise ValueError(
644 "RenameIndex.old_name and old_fields are mutually exclusive."
645 )
646 self.model_name = model_name
647 self.new_name = new_name
648 self.old_name = old_name
649 self.old_fields = old_fields
650
651 @cached_property
652 def old_name_lower(self) -> str:
653 assert self.old_name is not None, "old_name is set during initialization"
654 return self.old_name.lower()
655
656 @cached_property
657 def new_name_lower(self) -> str:
658 return self.new_name.lower()
659
660 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
661 kwargs: dict[str, Any] = {
662 "model_name": self.model_name,
663 "new_name": self.new_name,
664 }
665 if self.old_name:
666 kwargs["old_name"] = self.old_name
667 if self.old_fields:
668 kwargs["old_fields"] = self.old_fields
669 return (self.__class__.__qualname__, [], kwargs)
670
671 def state_forwards(self, package_label: str, state: ProjectState) -> None:
672 if self.old_fields:
673 state.add_index(
674 package_label,
675 self.model_name_lower,
676 models.Index(fields=self.old_fields, name=self.new_name),
677 )
678 else:
679 assert self.old_name is not None
680 state.rename_index(
681 package_label,
682 self.model_name_lower,
683 self.old_name,
684 self.new_name,
685 )
686
687 def database_forwards(
688 self,
689 package_label: str,
690 schema_editor: BaseDatabaseSchemaEditor,
691 from_state: ProjectState,
692 to_state: ProjectState,
693 ) -> None:
694 model = to_state.models_registry.get_model(package_label, self.model_name)
695 if not self.allow_migrate_model(schema_editor.connection, model):
696 return None
697
698 if self.old_fields:
699 from_model = from_state.models_registry.get_model(
700 package_label, self.model_name
701 )
702 columns = [
703 from_model._model_meta.get_forward_field(field).column
704 for field in self.old_fields
705 ]
706 matching_index_name = schema_editor._constraint_names(
707 from_model, column_names=columns, index=True
708 )
709 if len(matching_index_name) != 1:
710 raise ValueError(
711 "Found wrong number ({}) of indexes for {}({}).".format(
712 len(matching_index_name),
713 from_model.model_options.db_table,
714 ", ".join(columns),
715 )
716 )
717 old_index = models.Index(
718 fields=self.old_fields,
719 name=matching_index_name[0],
720 )
721 else:
722 from_model_state = from_state.models[package_label, self.model_name_lower]
723 assert self.old_name is not None
724 old_index = from_model_state.get_index_by_name(self.old_name)
725 # Don't alter when the index name is not changed.
726 if old_index.name == self.new_name:
727 return None
728
729 to_model_state = to_state.models[package_label, self.model_name_lower]
730 new_index = to_model_state.get_index_by_name(self.new_name)
731 schema_editor.rename_index(model, old_index, new_index)
732 return None
733
734 def describe(self) -> str:
735 if self.old_name:
736 return (
737 f"Rename index {self.old_name} on {self.model_name} to {self.new_name}"
738 )
739 return (
740 f"Rename unnamed index for {self.old_fields} on {self.model_name} to "
741 f"{self.new_name}"
742 )
743
744 @property
745 def migration_name_fragment(self) -> str:
746 if self.old_name:
747 return f"rename_{self.old_name_lower}_{self.new_name_lower}"
748 assert self.old_fields is not None, "old_fields is set when old_name is None"
749 return "rename_{}_{}_{}".format(
750 self.model_name_lower,
751 "_".join(self.old_fields),
752 self.new_name_lower,
753 )
754
755 def reduce(
756 self, operation: Operation, package_label: str
757 ) -> bool | list[Operation]:
758 if (
759 isinstance(operation, RenameIndex)
760 and self.model_name_lower == operation.model_name_lower
761 and operation.old_name
762 and self.new_name_lower == operation.old_name_lower
763 ):
764 return [
765 RenameIndex(
766 self.model_name,
767 new_name=operation.new_name,
768 old_name=self.old_name,
769 old_fields=self.old_fields,
770 )
771 ]
772 return super().reduce(operation, package_label)
773
774
775class AddConstraint(IndexOperation):
776 option_name = "constraints"
777
778 def __init__(self, model_name: str, constraint: Any) -> None:
779 self.model_name = model_name
780 self.constraint = constraint
781
782 def state_forwards(self, package_label: str, state: ProjectState) -> None:
783 state.add_constraint(package_label, self.model_name_lower, self.constraint)
784
785 def database_forwards(
786 self,
787 package_label: str,
788 schema_editor: BaseDatabaseSchemaEditor,
789 from_state: ProjectState,
790 to_state: ProjectState,
791 ) -> None:
792 model = to_state.models_registry.get_model(package_label, self.model_name)
793 if self.allow_migrate_model(schema_editor.connection, model):
794 schema_editor.add_constraint(model, self.constraint)
795
796 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
797 return (
798 self.__class__.__name__,
799 [],
800 {
801 "model_name": self.model_name,
802 "constraint": self.constraint,
803 },
804 )
805
806 def describe(self) -> str:
807 return f"Create constraint {self.constraint.name} on model {self.model_name}"
808
809 @property
810 def migration_name_fragment(self) -> str:
811 return f"{self.model_name_lower}_{self.constraint.name.lower()}"
812
813
814class RemoveConstraint(IndexOperation):
815 option_name = "constraints"
816
817 def __init__(self, model_name: str, name: str) -> None:
818 self.model_name = model_name
819 self.name = name
820
821 def state_forwards(self, package_label: str, state: ProjectState) -> None:
822 state.remove_constraint(package_label, self.model_name_lower, self.name)
823
824 def database_forwards(
825 self,
826 package_label: str,
827 schema_editor: BaseDatabaseSchemaEditor,
828 from_state: ProjectState,
829 to_state: ProjectState,
830 ) -> None:
831 model = to_state.models_registry.get_model(package_label, self.model_name)
832 if self.allow_migrate_model(schema_editor.connection, model):
833 from_model_state = from_state.models[package_label, self.model_name_lower]
834 constraint = from_model_state.get_constraint_by_name(self.name)
835 schema_editor.remove_constraint(model, constraint)
836
837 def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
838 return (
839 self.__class__.__name__,
840 [],
841 {
842 "model_name": self.model_name,
843 "name": self.name,
844 },
845 )
846
847 def describe(self) -> str:
848 return f"Remove constraint {self.name} from model {self.model_name}"
849
850 @property
851 def migration_name_fragment(self) -> str:
852 return f"remove_{self.model_name_lower}_{self.name.lower()}"