Plain is headed towards 1.0! Subscribe for development updates โ†’

plain.models

Model your data and store it in a database.

# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField


class User(models.Model):
    email = models.EmailField()
    password = PasswordField()
    is_admin = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.email

Create, update, and delete instances of your models:

from .models import User


# Create a new user
user = User.objects.create(
    email="[email protected]",
    password="password",
)

# Update a user
user.email = "[email protected]"
user.save()

# Delete a user
user.delete()

# Query for users
admin_users = User.objects.filter(is_admin=True)

Installation

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.models",
]

To connect to a database, you can provide a DATABASE_URL environment variable.

DATABASE_URL=postgresql://user:password@localhost:5432/dbname

Or you can manually define the DATABASES setting.

# app/settings.py
DATABASES = {
    "default": {
        "ENGINE": "plain.models.backends.postgresql",
        "NAME": "dbname",
        "USER": "user",
        "PASSWORD": "password",
        "HOST": "localhost",
        "PORT": "5432",
    }
}

Multiple backends are supported, including Postgres, MySQL, and SQLite.

Querying

Migrations

Migration docs

Fields

Field docs

Validation

Indexes and constraints

Managers

Forms

  1from enum import Enum
  2from types import NoneType
  3
  4from plain.exceptions import FieldError, ValidationError
  5from plain.models.db import DEFAULT_DB_ALIAS, connections
  6from plain.models.expressions import Exists, ExpressionList, F, OrderBy
  7from plain.models.indexes import IndexExpression
  8from plain.models.lookups import Exact
  9from plain.models.query_utils import Q
 10from plain.models.sql.query import Query
 11
 12__all__ = ["BaseConstraint", "CheckConstraint", "Deferrable", "UniqueConstraint"]
 13
 14
 15class BaseConstraint:
 16    default_violation_error_message = "Constraint โ€œ%(name)sโ€ is violated."
 17    violation_error_code = None
 18    violation_error_message = None
 19
 20    def __init__(
 21        self, *, name, violation_error_code=None, violation_error_message=None
 22    ):
 23        self.name = name
 24        if violation_error_code is not None:
 25            self.violation_error_code = violation_error_code
 26        if violation_error_message is not None:
 27            self.violation_error_message = violation_error_message
 28        else:
 29            self.violation_error_message = self.default_violation_error_message
 30
 31    @property
 32    def contains_expressions(self):
 33        return False
 34
 35    def constraint_sql(self, model, schema_editor):
 36        raise NotImplementedError("This method must be implemented by a subclass.")
 37
 38    def create_sql(self, model, schema_editor):
 39        raise NotImplementedError("This method must be implemented by a subclass.")
 40
 41    def remove_sql(self, model, schema_editor):
 42        raise NotImplementedError("This method must be implemented by a subclass.")
 43
 44    def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
 45        raise NotImplementedError("This method must be implemented by a subclass.")
 46
 47    def get_violation_error_message(self):
 48        return self.violation_error_message % {"name": self.name}
 49
 50    def deconstruct(self):
 51        path = f"{self.__class__.__module__}.{self.__class__.__name__}"
 52        path = path.replace("plain.models.constraints", "plain.models")
 53        kwargs = {"name": self.name}
 54        if (
 55            self.violation_error_message is not None
 56            and self.violation_error_message != self.default_violation_error_message
 57        ):
 58            kwargs["violation_error_message"] = self.violation_error_message
 59        if self.violation_error_code is not None:
 60            kwargs["violation_error_code"] = self.violation_error_code
 61        return (path, (), kwargs)
 62
 63    def clone(self):
 64        _, args, kwargs = self.deconstruct()
 65        return self.__class__(*args, **kwargs)
 66
 67
 68class CheckConstraint(BaseConstraint):
 69    def __init__(
 70        self, *, check, name, violation_error_code=None, violation_error_message=None
 71    ):
 72        self.check = check
 73        if not getattr(check, "conditional", False):
 74            raise TypeError(
 75                "CheckConstraint.check must be a Q instance or boolean expression."
 76            )
 77        super().__init__(
 78            name=name,
 79            violation_error_code=violation_error_code,
 80            violation_error_message=violation_error_message,
 81        )
 82
 83    def _get_check_sql(self, model, schema_editor):
 84        query = Query(model=model, alias_cols=False)
 85        where = query.build_where(self.check)
 86        compiler = query.get_compiler(connection=schema_editor.connection)
 87        sql, params = where.as_sql(compiler, schema_editor.connection)
 88        return sql % tuple(schema_editor.quote_value(p) for p in params)
 89
 90    def constraint_sql(self, model, schema_editor):
 91        check = self._get_check_sql(model, schema_editor)
 92        return schema_editor._check_sql(self.name, check)
 93
 94    def create_sql(self, model, schema_editor):
 95        check = self._get_check_sql(model, schema_editor)
 96        return schema_editor._create_check_sql(model, self.name, check)
 97
 98    def remove_sql(self, model, schema_editor):
 99        return schema_editor._delete_check_sql(model, self.name)
100
101    def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
102        against = instance._get_field_value_map(meta=model._meta, exclude=exclude)
103        try:
104            if not Q(self.check).check(against, using=using):
105                raise ValidationError(
106                    self.get_violation_error_message(), code=self.violation_error_code
107                )
108        except FieldError:
109            pass
110
111    def __repr__(self):
112        return "<{}: check={} name={}{}{}>".format(
113            self.__class__.__qualname__,
114            self.check,
115            repr(self.name),
116            (
117                ""
118                if self.violation_error_code is None
119                else f" violation_error_code={self.violation_error_code!r}"
120            ),
121            (
122                ""
123                if self.violation_error_message is None
124                or self.violation_error_message == self.default_violation_error_message
125                else f" violation_error_message={self.violation_error_message!r}"
126            ),
127        )
128
129    def __eq__(self, other):
130        if isinstance(other, CheckConstraint):
131            return (
132                self.name == other.name
133                and self.check == other.check
134                and self.violation_error_code == other.violation_error_code
135                and self.violation_error_message == other.violation_error_message
136            )
137        return super().__eq__(other)
138
139    def deconstruct(self):
140        path, args, kwargs = super().deconstruct()
141        kwargs["check"] = self.check
142        return path, args, kwargs
143
144
145class Deferrable(Enum):
146    DEFERRED = "deferred"
147    IMMEDIATE = "immediate"
148
149    # A similar format was proposed for Python 3.10.
150    def __repr__(self):
151        return f"{self.__class__.__qualname__}.{self._name_}"
152
153
154class UniqueConstraint(BaseConstraint):
155    def __init__(
156        self,
157        *expressions,
158        fields=(),
159        name=None,
160        condition=None,
161        deferrable=None,
162        include=None,
163        opclasses=(),
164        violation_error_code=None,
165        violation_error_message=None,
166    ):
167        if not name:
168            raise ValueError("A unique constraint must be named.")
169        if not expressions and not fields:
170            raise ValueError(
171                "At least one field or expression is required to define a "
172                "unique constraint."
173            )
174        if expressions and fields:
175            raise ValueError(
176                "UniqueConstraint.fields and expressions are mutually exclusive."
177            )
178        if not isinstance(condition, NoneType | Q):
179            raise ValueError("UniqueConstraint.condition must be a Q instance.")
180        if condition and deferrable:
181            raise ValueError("UniqueConstraint with conditions cannot be deferred.")
182        if include and deferrable:
183            raise ValueError("UniqueConstraint with include fields cannot be deferred.")
184        if opclasses and deferrable:
185            raise ValueError("UniqueConstraint with opclasses cannot be deferred.")
186        if expressions and deferrable:
187            raise ValueError("UniqueConstraint with expressions cannot be deferred.")
188        if expressions and opclasses:
189            raise ValueError(
190                "UniqueConstraint.opclasses cannot be used with expressions. "
191                "Use a custom OpClass() instead."
192            )
193        if not isinstance(deferrable, NoneType | Deferrable):
194            raise ValueError(
195                "UniqueConstraint.deferrable must be a Deferrable instance."
196            )
197        if not isinstance(include, NoneType | list | tuple):
198            raise ValueError("UniqueConstraint.include must be a list or tuple.")
199        if not isinstance(opclasses, list | tuple):
200            raise ValueError("UniqueConstraint.opclasses must be a list or tuple.")
201        if opclasses and len(fields) != len(opclasses):
202            raise ValueError(
203                "UniqueConstraint.fields and UniqueConstraint.opclasses must "
204                "have the same number of elements."
205            )
206        self.fields = tuple(fields)
207        self.condition = condition
208        self.deferrable = deferrable
209        self.include = tuple(include) if include else ()
210        self.opclasses = opclasses
211        self.expressions = tuple(
212            F(expression) if isinstance(expression, str) else expression
213            for expression in expressions
214        )
215        super().__init__(
216            name=name,
217            violation_error_code=violation_error_code,
218            violation_error_message=violation_error_message,
219        )
220
221    @property
222    def contains_expressions(self):
223        return bool(self.expressions)
224
225    def _get_condition_sql(self, model, schema_editor):
226        if self.condition is None:
227            return None
228        query = Query(model=model, alias_cols=False)
229        where = query.build_where(self.condition)
230        compiler = query.get_compiler(connection=schema_editor.connection)
231        sql, params = where.as_sql(compiler, schema_editor.connection)
232        return sql % tuple(schema_editor.quote_value(p) for p in params)
233
234    def _get_index_expressions(self, model, schema_editor):
235        if not self.expressions:
236            return None
237        index_expressions = []
238        for expression in self.expressions:
239            index_expression = IndexExpression(expression)
240            index_expression.set_wrapper_classes(schema_editor.connection)
241            index_expressions.append(index_expression)
242        return ExpressionList(*index_expressions).resolve_expression(
243            Query(model, alias_cols=False),
244        )
245
246    def constraint_sql(self, model, schema_editor):
247        fields = [model._meta.get_field(field_name) for field_name in self.fields]
248        include = [
249            model._meta.get_field(field_name).column for field_name in self.include
250        ]
251        condition = self._get_condition_sql(model, schema_editor)
252        expressions = self._get_index_expressions(model, schema_editor)
253        return schema_editor._unique_sql(
254            model,
255            fields,
256            self.name,
257            condition=condition,
258            deferrable=self.deferrable,
259            include=include,
260            opclasses=self.opclasses,
261            expressions=expressions,
262        )
263
264    def create_sql(self, model, schema_editor):
265        fields = [model._meta.get_field(field_name) for field_name in self.fields]
266        include = [
267            model._meta.get_field(field_name).column for field_name in self.include
268        ]
269        condition = self._get_condition_sql(model, schema_editor)
270        expressions = self._get_index_expressions(model, schema_editor)
271        return schema_editor._create_unique_sql(
272            model,
273            fields,
274            self.name,
275            condition=condition,
276            deferrable=self.deferrable,
277            include=include,
278            opclasses=self.opclasses,
279            expressions=expressions,
280        )
281
282    def remove_sql(self, model, schema_editor):
283        condition = self._get_condition_sql(model, schema_editor)
284        include = [
285            model._meta.get_field(field_name).column for field_name in self.include
286        ]
287        expressions = self._get_index_expressions(model, schema_editor)
288        return schema_editor._delete_unique_sql(
289            model,
290            self.name,
291            condition=condition,
292            deferrable=self.deferrable,
293            include=include,
294            opclasses=self.opclasses,
295            expressions=expressions,
296        )
297
298    def __repr__(self):
299        return "<{}:{}{}{}{}{}{}{}{}{}>".format(
300            self.__class__.__qualname__,
301            "" if not self.fields else f" fields={repr(self.fields)}",
302            "" if not self.expressions else f" expressions={repr(self.expressions)}",
303            f" name={repr(self.name)}",
304            "" if self.condition is None else f" condition={self.condition}",
305            "" if self.deferrable is None else f" deferrable={self.deferrable!r}",
306            "" if not self.include else f" include={repr(self.include)}",
307            "" if not self.opclasses else f" opclasses={repr(self.opclasses)}",
308            (
309                ""
310                if self.violation_error_code is None
311                else f" violation_error_code={self.violation_error_code!r}"
312            ),
313            (
314                ""
315                if self.violation_error_message is None
316                or self.violation_error_message == self.default_violation_error_message
317                else f" violation_error_message={self.violation_error_message!r}"
318            ),
319        )
320
321    def __eq__(self, other):
322        if isinstance(other, UniqueConstraint):
323            return (
324                self.name == other.name
325                and self.fields == other.fields
326                and self.condition == other.condition
327                and self.deferrable == other.deferrable
328                and self.include == other.include
329                and self.opclasses == other.opclasses
330                and self.expressions == other.expressions
331                and self.violation_error_code == other.violation_error_code
332                and self.violation_error_message == other.violation_error_message
333            )
334        return super().__eq__(other)
335
336    def deconstruct(self):
337        path, args, kwargs = super().deconstruct()
338        if self.fields:
339            kwargs["fields"] = self.fields
340        if self.condition:
341            kwargs["condition"] = self.condition
342        if self.deferrable:
343            kwargs["deferrable"] = self.deferrable
344        if self.include:
345            kwargs["include"] = self.include
346        if self.opclasses:
347            kwargs["opclasses"] = self.opclasses
348        return path, self.expressions, kwargs
349
350    def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
351        queryset = model._default_manager.using(using)
352        if self.fields:
353            lookup_kwargs = {}
354            for field_name in self.fields:
355                if exclude and field_name in exclude:
356                    return
357                field = model._meta.get_field(field_name)
358                lookup_value = getattr(instance, field.attname)
359                if lookup_value is None or (
360                    lookup_value == ""
361                    and connections[using].features.interprets_empty_strings_as_nulls
362                ):
363                    # A composite constraint containing NULL value cannot cause
364                    # a violation since NULL != NULL in SQL.
365                    return
366                lookup_kwargs[field.name] = lookup_value
367            queryset = queryset.filter(**lookup_kwargs)
368        else:
369            # Ignore constraints with excluded fields.
370            if exclude:
371                for expression in self.expressions:
372                    if hasattr(expression, "flatten"):
373                        for expr in expression.flatten():
374                            if isinstance(expr, F) and expr.name in exclude:
375                                return
376                    elif isinstance(expression, F) and expression.name in exclude:
377                        return
378            replacements = {
379                F(field): value
380                for field, value in instance._get_field_value_map(
381                    meta=model._meta, exclude=exclude
382                ).items()
383            }
384            expressions = []
385            for expr in self.expressions:
386                # Ignore ordering.
387                if isinstance(expr, OrderBy):
388                    expr = expr.expression
389                expressions.append(Exact(expr, expr.replace_expressions(replacements)))
390            queryset = queryset.filter(*expressions)
391        model_class_pk = instance._get_pk_val(model._meta)
392        if not instance._state.adding and model_class_pk is not None:
393            queryset = queryset.exclude(pk=model_class_pk)
394        if not self.condition:
395            if queryset.exists():
396                if self.expressions:
397                    raise ValidationError(
398                        self.get_violation_error_message(),
399                        code=self.violation_error_code,
400                    )
401                # When fields are defined, use the unique_error_message() for
402                # backward compatibility.
403                for model, constraints in instance.get_constraints():
404                    for constraint in constraints:
405                        if constraint is self:
406                            raise ValidationError(
407                                instance.unique_error_message(model, self.fields),
408                            )
409        else:
410            against = instance._get_field_value_map(meta=model._meta, exclude=exclude)
411            try:
412                if (self.condition & Exists(queryset.filter(self.condition))).check(
413                    against, using=using
414                ):
415                    raise ValidationError(
416                        self.get_violation_error_message(),
417                        code=self.violation_error_code,
418                    )
419            except FieldError:
420                pass