Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from functools import cached_property
  4from typing import TYPE_CHECKING, Any
  5
  6from plain import models
  7from plain.models.base import ModelBase
  8from plain.models.migrations.operations.base import Operation
  9from plain.models.migrations.state import ModelState
 10from plain.models.migrations.utils import field_references, resolve_relation
 11
 12from .fields import AddField, AlterField, FieldOperation, RemoveField, RenameField
 13
 14if TYPE_CHECKING:
 15    from plain.models.backends.base.schema import BaseDatabaseSchemaEditor
 16    from plain.models.fields import Field
 17    from plain.models.migrations.state import ProjectState
 18
 19
 20def _check_for_duplicates(arg_name: str, objs: Any) -> None:
 21    used_vals = set()
 22    for val in objs:
 23        if val in used_vals:
 24            raise ValueError(
 25                f"Found duplicate value {val} in CreateModel {arg_name} argument."
 26            )
 27        used_vals.add(val)
 28
 29
 30class ModelOperation(Operation):
 31    def __init__(self, name: str) -> None:
 32        self.name = name
 33
 34    @cached_property
 35    def name_lower(self) -> str:
 36        return self.name.lower()
 37
 38    def references_model(self, name: str, package_label: str) -> bool:
 39        return name.lower() == self.name_lower
 40
 41    def reduce(
 42        self, operation: Operation, package_label: str
 43    ) -> bool | list[Operation]:
 44        return super().reduce(operation, package_label) or self.can_reduce_through(  # type: ignore[misc]
 45            operation, package_label
 46        )
 47
 48    def can_reduce_through(self, operation: Operation, package_label: str) -> bool:
 49        return not operation.references_model(self.name, package_label)
 50
 51
 52class CreateModel(ModelOperation):
 53    """Create a model's table."""
 54
 55    serialization_expand_args = ["fields", "options"]
 56
 57    def __init__(
 58        self,
 59        name: str,
 60        fields: list[tuple[str, Field]],
 61        options: dict[str, Any] | None = None,
 62        bases: tuple[Any, ...] | None = None,
 63    ) -> None:
 64        self.fields = fields
 65        self.options = options or {}
 66        self.bases = bases or (models.Model,)
 67        super().__init__(name)
 68        # Sanity-check that there are no duplicated field names or bases
 69        _check_for_duplicates("fields", (name for name, _ in self.fields))
 70        _check_for_duplicates(
 71            "bases",
 72            (
 73                base.model_options.label_lower
 74                if not isinstance(base, str)
 75                and base is not models.Model
 76                and hasattr(base, "_model_meta")
 77                else base.lower()
 78                if isinstance(base, str)
 79                else base
 80                for base in self.bases
 81            ),
 82        )
 83
 84    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
 85        kwargs: dict[str, Any] = {
 86            "name": self.name,
 87            "fields": self.fields,
 88        }
 89        if self.options:
 90            kwargs["options"] = self.options
 91        if self.bases and self.bases != (models.Model,):
 92            kwargs["bases"] = self.bases
 93        return (self.__class__.__qualname__, [], kwargs)
 94
 95    def state_forwards(self, package_label: str, state: ProjectState) -> None:
 96        state.add_model(
 97            ModelState(
 98                package_label,
 99                self.name,
100                list(self.fields),
101                dict(self.options),
102                tuple(self.bases),
103            )
104        )
105
106    def database_forwards(
107        self,
108        package_label: str,
109        schema_editor: BaseDatabaseSchemaEditor,
110        from_state: ProjectState,
111        to_state: ProjectState,
112    ) -> None:
113        model = to_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
114        if self.allow_migrate_model(schema_editor.connection, model):
115            schema_editor.create_model(model)
116
117    def describe(self) -> str:
118        return f"Create model {self.name}"
119
120    @property
121    def migration_name_fragment(self) -> str:
122        return self.name_lower
123
124    def references_model(self, name: str, package_label: str) -> bool:
125        name_lower = name.lower()
126        if name_lower == self.name_lower:
127            return True
128
129        # Check we didn't inherit from the model
130        reference_model_tuple = (package_label, name_lower)
131        for base in self.bases:
132            if (
133                base is not models.Model
134                and isinstance(base, ModelBase | str)
135                and resolve_relation(base, package_label) == reference_model_tuple
136            ):
137                return True
138
139        # Check we have no FKs/M2Ms with it
140        for _name, field in self.fields:
141            if field_references(
142                (package_label, self.name_lower), field, reference_model_tuple
143            ):
144                return True
145        return False
146
147    def reduce(
148        self, operation: Operation, package_label: str
149    ) -> bool | list[Operation]:
150        if (
151            isinstance(operation, DeleteModel)
152            and self.name_lower == operation.name_lower
153        ):
154            return []
155        elif (
156            isinstance(operation, RenameModel)
157            and self.name_lower == operation.old_name_lower
158        ):
159            return [
160                CreateModel(
161                    operation.new_name,
162                    fields=self.fields,
163                    options=self.options,
164                    bases=self.bases,
165                ),
166            ]
167        elif (
168            isinstance(operation, AlterModelOptions)
169            and self.name_lower == operation.name_lower
170        ):
171            options = {**self.options, **operation.options}
172            for key in operation.ALTER_OPTION_KEYS:
173                if key not in operation.options:
174                    options.pop(key, None)
175            return [
176                CreateModel(
177                    self.name,
178                    fields=self.fields,
179                    options=options,
180                    bases=self.bases,
181                ),
182            ]
183        elif (
184            isinstance(operation, FieldOperation)
185            and self.name_lower == operation.model_name_lower
186        ):
187            if isinstance(operation, AddField):
188                return [
189                    CreateModel(
190                        self.name,
191                        fields=self.fields + [(operation.name, operation.field)],
192                        options=self.options,
193                        bases=self.bases,
194                    ),
195                ]
196            elif isinstance(operation, AlterField):
197                return [
198                    CreateModel(
199                        self.name,
200                        fields=[
201                            (n, operation.field if n == operation.name else v)
202                            for n, v in self.fields
203                        ],
204                        options=self.options,
205                        bases=self.bases,
206                    ),
207                ]
208            elif isinstance(operation, RemoveField):
209                options = self.options.copy()
210
211                return [
212                    CreateModel(
213                        self.name,
214                        fields=[
215                            (n, v)
216                            for n, v in self.fields
217                            if n.lower() != operation.name_lower
218                        ],
219                        options=options,
220                        bases=self.bases,
221                    ),
222                ]
223            elif isinstance(operation, RenameField):
224                options = self.options.copy()
225
226                return [
227                    CreateModel(
228                        self.name,
229                        fields=[
230                            (operation.new_name if n == operation.old_name else n, v)
231                            for n, v in self.fields
232                        ],
233                        options=options,
234                        bases=self.bases,
235                    ),
236                ]
237        return super().reduce(operation, package_label)
238
239
240class DeleteModel(ModelOperation):
241    """Drop a model's table."""
242
243    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
244        kwargs: dict[str, Any] = {
245            "name": self.name,
246        }
247        return (self.__class__.__qualname__, [], kwargs)
248
249    def state_forwards(self, package_label: str, state: ProjectState) -> None:
250        state.remove_model(package_label, self.name_lower)
251
252    def database_forwards(
253        self,
254        package_label: str,
255        schema_editor: BaseDatabaseSchemaEditor,
256        from_state: ProjectState,
257        to_state: ProjectState,
258    ) -> None:
259        model = from_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
260        if self.allow_migrate_model(schema_editor.connection, model):
261            schema_editor.delete_model(model)
262
263    def references_model(self, name: str, package_label: str) -> bool:
264        # The deleted model could be referencing the specified model through
265        # related fields.
266        return True
267
268    def describe(self) -> str:
269        return f"Delete model {self.name}"
270
271    @property
272    def migration_name_fragment(self) -> str:
273        return f"delete_{self.name_lower}"
274
275
276class RenameModel(ModelOperation):
277    """Rename a model."""
278
279    def __init__(self, old_name: str, new_name: str) -> None:
280        self.old_name = old_name
281        self.new_name = new_name
282        super().__init__(old_name)
283
284    @cached_property
285    def old_name_lower(self) -> str:
286        return self.old_name.lower()
287
288    @cached_property
289    def new_name_lower(self) -> str:
290        return self.new_name.lower()
291
292    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
293        kwargs: dict[str, Any] = {
294            "old_name": self.old_name,
295            "new_name": self.new_name,
296        }
297        return (self.__class__.__qualname__, [], kwargs)
298
299    def state_forwards(self, package_label: str, state: ProjectState) -> None:
300        state.rename_model(package_label, self.old_name, self.new_name)
301
302    def database_forwards(
303        self,
304        package_label: str,
305        schema_editor: BaseDatabaseSchemaEditor,
306        from_state: ProjectState,
307        to_state: ProjectState,
308    ) -> None:
309        new_model = to_state.models_registry.get_model(package_label, self.new_name)  # type: ignore[attr-defined]
310        if self.allow_migrate_model(schema_editor.connection, new_model):
311            old_model = from_state.models_registry.get_model(  # type: ignore[attr-defined]
312                package_label, self.old_name
313            )
314            # Move the main table
315            schema_editor.alter_db_table(
316                new_model,
317                old_model.model_options.db_table,
318                new_model.model_options.db_table,
319            )
320            # Alter the fields pointing to us
321            for related_object in old_model._model_meta.related_objects:
322                if related_object.related_model == old_model:  # type: ignore[attr-defined]
323                    model = new_model
324                    related_key = (package_label, self.new_name_lower)
325                else:
326                    model = related_object.related_model  # type: ignore[attr-defined]
327                    related_key = (
328                        related_object.related_model.model_options.package_label,
329                        related_object.related_model.model_options.model_name,
330                    )
331                to_field = to_state.models_registry.get_model(
332                    *related_key
333                )._model_meta.get_field(related_object.field.name)
334                schema_editor.alter_field(
335                    model,
336                    related_object.field,
337                    to_field,
338                )
339
340    def references_model(self, name: str, package_label: str) -> bool:
341        return (
342            name.lower() == self.old_name_lower or name.lower() == self.new_name_lower
343        )
344
345    def describe(self) -> str:
346        return f"Rename model {self.old_name} to {self.new_name}"
347
348    @property
349    def migration_name_fragment(self) -> str:
350        return f"rename_{self.old_name_lower}_{self.new_name_lower}"
351
352    def reduce(
353        self, operation: Operation, package_label: str
354    ) -> bool | list[Operation]:
355        if (
356            isinstance(operation, RenameModel)
357            and self.new_name_lower == operation.old_name_lower
358        ):
359            return [
360                RenameModel(
361                    self.old_name,
362                    operation.new_name,
363                ),
364            ]
365        # Skip `ModelOperation.reduce` as we want to run `references_model`
366        # against self.new_name.
367        return super(ModelOperation, self).reduce(  # type: ignore[misc]
368            operation, package_label
369        ) or not operation.references_model(self.new_name, package_label)
370
371
372class ModelOptionOperation(ModelOperation):
373    def reduce(
374        self, operation: Operation, package_label: str
375    ) -> bool | list[Operation]:
376        if (
377            isinstance(operation, self.__class__ | DeleteModel)
378            and self.name_lower == operation.name_lower
379        ):
380            return [operation]
381        return super().reduce(operation, package_label)
382
383
384class AlterModelTable(ModelOptionOperation):
385    """Rename a model's table."""
386
387    def __init__(self, name: str, table: str | None) -> None:
388        self.table = table
389        super().__init__(name)
390
391    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
392        kwargs: dict[str, Any] = {
393            "name": self.name,
394            "table": self.table,
395        }
396        return (self.__class__.__qualname__, [], kwargs)
397
398    def state_forwards(self, package_label: str, state: ProjectState) -> None:
399        state.alter_model_options(
400            package_label, self.name_lower, {"db_table": self.table}
401        )
402
403    def database_forwards(
404        self,
405        package_label: str,
406        schema_editor: BaseDatabaseSchemaEditor,
407        from_state: ProjectState,
408        to_state: ProjectState,
409    ) -> None:
410        new_model = to_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
411        if self.allow_migrate_model(schema_editor.connection, new_model):
412            old_model = from_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
413            schema_editor.alter_db_table(
414                new_model,
415                old_model.model_options.db_table,
416                new_model.model_options.db_table,
417            )
418
419    def describe(self) -> str:
420        return "Rename table for {} to {}".format(
421            self.name,
422            self.table if self.table is not None else "(default)",
423        )
424
425    @property
426    def migration_name_fragment(self) -> str:
427        return f"alter_{self.name_lower}_table"
428
429
430class AlterModelTableComment(ModelOptionOperation):
431    def __init__(self, name: str, table_comment: str | None) -> None:
432        self.table_comment = table_comment
433        super().__init__(name)
434
435    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
436        kwargs: dict[str, Any] = {
437            "name": self.name,
438            "table_comment": self.table_comment,
439        }
440        return (self.__class__.__qualname__, [], kwargs)
441
442    def state_forwards(self, package_label: str, state: ProjectState) -> None:
443        state.alter_model_options(
444            package_label, self.name_lower, {"db_table_comment": self.table_comment}
445        )
446
447    def database_forwards(
448        self,
449        package_label: str,
450        schema_editor: BaseDatabaseSchemaEditor,
451        from_state: ProjectState,
452        to_state: ProjectState,
453    ) -> None:
454        new_model = to_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
455        if self.allow_migrate_model(schema_editor.connection, new_model):
456            old_model = from_state.models_registry.get_model(package_label, self.name)  # type: ignore[attr-defined]
457            schema_editor.alter_db_table_comment(
458                new_model,
459                old_model.model_options.db_table_comment,
460                new_model.model_options.db_table_comment,
461            )
462
463    def describe(self) -> str:
464        return f"Alter {self.name} table comment"
465
466    @property
467    def migration_name_fragment(self) -> str:
468        return f"alter_{self.name_lower}_table_comment"
469
470
471class AlterModelOptions(ModelOptionOperation):
472    """
473    Set new model options that don't directly affect the database schema
474    (like ordering). Python code in migrations
475    may still need them.
476    """
477
478    # Model options we want to compare and preserve in an AlterModelOptions op
479    ALTER_OPTION_KEYS = [
480        "ordering",
481    ]
482
483    def __init__(self, name: str, options: dict[str, Any]) -> None:
484        self.options = options
485        super().__init__(name)
486
487    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
488        kwargs: dict[str, Any] = {
489            "name": self.name,
490            "options": self.options,
491        }
492        return (self.__class__.__qualname__, [], kwargs)
493
494    def state_forwards(self, package_label: str, state: ProjectState) -> None:
495        state.alter_model_options(
496            package_label,
497            self.name_lower,
498            self.options,
499            self.ALTER_OPTION_KEYS,
500        )
501
502    def database_forwards(
503        self,
504        package_label: str,
505        schema_editor: BaseDatabaseSchemaEditor,
506        from_state: ProjectState,
507        to_state: ProjectState,
508    ) -> None:
509        pass
510
511    def describe(self) -> str:
512        return f"Change Meta options on {self.name}"
513
514    @property
515    def migration_name_fragment(self) -> str:
516        return f"alter_{self.name_lower}_options"
517
518
519class IndexOperation(Operation):
520    option_name = "indexes"
521
522    @cached_property
523    def model_name_lower(self) -> str:
524        return self.model_name.lower()  # type: ignore[attr-defined]
525
526
527class AddIndex(IndexOperation):
528    """Add an index on a model."""
529
530    def __init__(self, model_name: str, index: Any) -> None:
531        self.model_name = model_name
532        if not index.name:  # type: ignore[attr-defined]
533            raise ValueError(
534                "Indexes passed to AddIndex operations require a name "
535                f"argument. {index!r} doesn't have one."
536            )
537        self.index = index
538
539    def state_forwards(self, package_label: str, state: ProjectState) -> None:
540        state.add_index(package_label, self.model_name_lower, self.index)
541
542    def database_forwards(
543        self,
544        package_label: str,
545        schema_editor: BaseDatabaseSchemaEditor,
546        from_state: ProjectState,
547        to_state: ProjectState,
548    ) -> None:
549        model = to_state.models_registry.get_model(package_label, self.model_name)  # type: ignore[attr-defined]
550        if self.allow_migrate_model(schema_editor.connection, model):
551            schema_editor.add_index(model, self.index)
552
553    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
554        kwargs: dict[str, Any] = {
555            "model_name": self.model_name,
556            "index": self.index,
557        }
558        return (
559            self.__class__.__qualname__,
560            [],
561            kwargs,
562        )
563
564    def describe(self) -> str:
565        if self.index.expressions:  # type: ignore[attr-defined]
566            return "Create index {} on {} on model {}".format(
567                self.index.name,  # type: ignore[attr-defined]
568                ", ".join([str(expression) for expression in self.index.expressions]),  # type: ignore[attr-defined]
569                self.model_name,
570            )
571        return "Create index {} on field(s) {} of model {}".format(
572            self.index.name,  # type: ignore[attr-defined]
573            ", ".join(self.index.fields),  # type: ignore[attr-defined]
574            self.model_name,
575        )
576
577    @property
578    def migration_name_fragment(self) -> str:
579        return f"{self.model_name_lower}_{self.index.name.lower()}"  # type: ignore[attr-defined]
580
581
582class RemoveIndex(IndexOperation):
583    """Remove an index from a model."""
584
585    def __init__(self, model_name: str, name: str) -> None:
586        self.model_name = model_name
587        self.name = name
588
589    def state_forwards(self, package_label: str, state: ProjectState) -> None:
590        state.remove_index(package_label, self.model_name_lower, self.name)
591
592    def database_forwards(
593        self,
594        package_label: str,
595        schema_editor: BaseDatabaseSchemaEditor,
596        from_state: ProjectState,
597        to_state: ProjectState,
598    ) -> None:
599        model = from_state.models_registry.get_model(package_label, self.model_name)  # type: ignore[attr-defined]
600        if self.allow_migrate_model(schema_editor.connection, model):
601            from_model_state = from_state.models[package_label, self.model_name_lower]
602            index = from_model_state.get_index_by_name(self.name)
603            schema_editor.remove_index(model, index)
604
605    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
606        kwargs: dict[str, Any] = {
607            "model_name": self.model_name,
608            "name": self.name,
609        }
610        return (
611            self.__class__.__qualname__,
612            [],
613            kwargs,
614        )
615
616    def describe(self) -> str:
617        return f"Remove index {self.name} from {self.model_name}"
618
619    @property
620    def migration_name_fragment(self) -> str:
621        return f"remove_{self.model_name_lower}_{self.name.lower()}"
622
623
624class RenameIndex(IndexOperation):
625    """Rename an index."""
626
627    def __init__(
628        self,
629        model_name: str,
630        new_name: str,
631        old_name: str | None = None,
632        old_fields: list[str] | tuple[str, ...] | None = None,
633    ) -> None:
634        if not old_name and not old_fields:
635            raise ValueError(
636                "RenameIndex requires one of old_name and old_fields arguments to be "
637                "set."
638            )
639        if old_name and old_fields:
640            raise ValueError(
641                "RenameIndex.old_name and old_fields are mutually exclusive."
642            )
643        self.model_name = model_name
644        self.new_name = new_name
645        self.old_name = old_name
646        self.old_fields = old_fields
647
648    @cached_property
649    def old_name_lower(self) -> str:
650        return self.old_name.lower()  # type: ignore[union-attr]
651
652    @cached_property
653    def new_name_lower(self) -> str:
654        return self.new_name.lower()
655
656    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
657        kwargs: dict[str, Any] = {
658            "model_name": self.model_name,
659            "new_name": self.new_name,
660        }
661        if self.old_name:
662            kwargs["old_name"] = self.old_name
663        if self.old_fields:
664            kwargs["old_fields"] = self.old_fields
665        return (self.__class__.__qualname__, [], kwargs)
666
667    def state_forwards(self, package_label: str, state: ProjectState) -> None:
668        if self.old_fields:
669            state.add_index(
670                package_label,
671                self.model_name_lower,
672                models.Index(fields=self.old_fields, name=self.new_name),
673            )
674        else:
675            state.rename_index(
676                package_label,
677                self.model_name_lower,
678                self.old_name,
679                self.new_name,  # type: ignore[arg-type]
680            )
681
682    def database_forwards(
683        self,
684        package_label: str,
685        schema_editor: BaseDatabaseSchemaEditor,
686        from_state: ProjectState,
687        to_state: ProjectState,
688    ) -> None:
689        model = to_state.models_registry.get_model(package_label, self.model_name)  # type: ignore[attr-defined]
690        if not self.allow_migrate_model(schema_editor.connection, model):
691            return None
692
693        if self.old_fields:
694            from_model = from_state.models_registry.get_model(  # type: ignore[attr-defined]
695                package_label, self.model_name
696            )
697            columns = [
698                from_model._model_meta.get_field(field).column
699                for field in self.old_fields
700            ]
701            matching_index_name = schema_editor._constraint_names(
702                from_model, column_names=columns, index=True
703            )
704            if len(matching_index_name) != 1:
705                raise ValueError(
706                    "Found wrong number ({}) of indexes for {}({}).".format(
707                        len(matching_index_name),
708                        from_model.model_options.db_table,
709                        ", ".join(columns),
710                    )
711                )
712            old_index = models.Index(
713                fields=self.old_fields,
714                name=matching_index_name[0],
715            )
716        else:
717            from_model_state = from_state.models[package_label, self.model_name_lower]
718            old_index = from_model_state.get_index_by_name(self.old_name)  # type: ignore[arg-type]
719        # Don't alter when the index name is not changed.
720        if old_index.name == self.new_name:  # type: ignore[attr-defined]
721            return None
722
723        to_model_state = to_state.models[package_label, self.model_name_lower]
724        new_index = to_model_state.get_index_by_name(self.new_name)
725        schema_editor.rename_index(model, old_index, new_index)
726        return None
727
728    def describe(self) -> str:
729        if self.old_name:
730            return (
731                f"Rename index {self.old_name} on {self.model_name} to {self.new_name}"
732            )
733        return (
734            f"Rename unnamed index for {self.old_fields} on {self.model_name} to "
735            f"{self.new_name}"
736        )
737
738    @property
739    def migration_name_fragment(self) -> str:
740        if self.old_name:
741            return f"rename_{self.old_name_lower}_{self.new_name_lower}"
742        return "rename_{}_{}_{}".format(
743            self.model_name_lower,
744            "_".join(self.old_fields),  # type: ignore[arg-type]
745            self.new_name_lower,
746        )
747
748    def reduce(
749        self, operation: Operation, package_label: str
750    ) -> bool | list[Operation]:
751        if (
752            isinstance(operation, RenameIndex)
753            and self.model_name_lower == operation.model_name_lower
754            and operation.old_name
755            and self.new_name_lower == operation.old_name_lower
756        ):
757            return [
758                RenameIndex(
759                    self.model_name,
760                    new_name=operation.new_name,
761                    old_name=self.old_name,
762                    old_fields=self.old_fields,
763                )
764            ]
765        return super().reduce(operation, package_label)
766
767
768class AddConstraint(IndexOperation):
769    option_name = "constraints"
770
771    def __init__(self, model_name: str, constraint: Any) -> None:
772        self.model_name = model_name
773        self.constraint = constraint
774
775    def state_forwards(self, package_label: str, state: ProjectState) -> None:
776        state.add_constraint(package_label, self.model_name_lower, self.constraint)
777
778    def database_forwards(
779        self,
780        package_label: str,
781        schema_editor: BaseDatabaseSchemaEditor,
782        from_state: ProjectState,
783        to_state: ProjectState,
784    ) -> None:
785        model = to_state.models_registry.get_model(package_label, self.model_name)  # type: ignore[attr-defined]
786        if self.allow_migrate_model(schema_editor.connection, model):
787            schema_editor.add_constraint(model, self.constraint)
788
789    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
790        return (
791            self.__class__.__name__,
792            [],
793            {
794                "model_name": self.model_name,
795                "constraint": self.constraint,
796            },
797        )
798
799    def describe(self) -> str:
800        return f"Create constraint {self.constraint.name} on model {self.model_name}"  # type: ignore[attr-defined]
801
802    @property
803    def migration_name_fragment(self) -> str:
804        return f"{self.model_name_lower}_{self.constraint.name.lower()}"  # type: ignore[attr-defined]
805
806
807class RemoveConstraint(IndexOperation):
808    option_name = "constraints"
809
810    def __init__(self, model_name: str, name: str) -> None:
811        self.model_name = model_name
812        self.name = name
813
814    def state_forwards(self, package_label: str, state: ProjectState) -> None:
815        state.remove_constraint(package_label, self.model_name_lower, self.name)
816
817    def database_forwards(
818        self,
819        package_label: str,
820        schema_editor: BaseDatabaseSchemaEditor,
821        from_state: ProjectState,
822        to_state: ProjectState,
823    ) -> None:
824        model = to_state.models_registry.get_model(package_label, self.model_name)  # type: ignore[attr-defined]
825        if self.allow_migrate_model(schema_editor.connection, model):
826            from_model_state = from_state.models[package_label, self.model_name_lower]
827            constraint = from_model_state.get_constraint_by_name(self.name)
828            schema_editor.remove_constraint(model, constraint)
829
830    def deconstruct(self) -> tuple[str, list[Any], dict[str, Any]]:
831        return (
832            self.__class__.__name__,
833            [],
834            {
835                "model_name": self.model_name,
836                "name": self.name,
837            },
838        )
839
840    def describe(self) -> str:
841        return f"Remove constraint {self.name} from model {self.model_name}"
842
843    @property
844    def migration_name_fragment(self) -> str:
845        return f"remove_{self.model_name_lower}_{self.name.lower()}"