Plain is headed towards 1.0! Subscribe for development updates →

plain.models

Model your data and store it in a database.

# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField


class User(models.Model):
    email = models.EmailField(unique=True)
    password = PasswordField()
    is_staff = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.email

Create, update, and delete instances of your models:

from .models import User


# Create a new user
user = User.objects.create(
    email="[email protected]",
    password="password",
)

# Update a user
user.email = "[email protected]"
user.save()

# Delete a user
user.delete()

# Query for users
staff_users = User.objects.filter(is_staff=True)

Installation

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.models",
]

To connect to a database, you can provide a DATABASE_URL environment variable.

DATABASE_URL=postgresql://user:password@localhost:5432/dbname

Or you can manually define the DATABASES setting.

# app/settings.py
DATABASES = {
    "default": {
        "ENGINE": "plain.models.backends.postgresql",
        "NAME": "dbname",
        "USER": "user",
        "PASSWORD": "password",
        "HOST": "localhost",
        "PORT": "5432",
    }
}

Multiple backends are supported, including Postgres, MySQL, and SQLite.

Querying

Migrations

Migration docs

Fields

Field docs

Validation

Indexes and constraints

Managers

Forms

  1from types import NoneType
  2
  3from plain.models.backends.utils import names_digest, split_identifier
  4from plain.models.expressions import Col, ExpressionList, F, Func, OrderBy
  5from plain.models.functions import Collate
  6from plain.models.query_utils import Q
  7from plain.models.sql import Query
  8from plain.utils.functional import partition
  9
 10__all__ = ["Index"]
 11
 12
 13class Index:
 14    suffix = "idx"
 15    # The max length of the name of the index (restricted to 30 for
 16    # cross-database compatibility with Oracle)
 17    max_name_length = 30
 18
 19    def __init__(
 20        self,
 21        *expressions,
 22        fields=(),
 23        name=None,
 24        db_tablespace=None,
 25        opclasses=(),
 26        condition=None,
 27        include=None,
 28    ):
 29        if opclasses and not name:
 30            raise ValueError("An index must be named to use opclasses.")
 31        if not isinstance(condition, NoneType | Q):
 32            raise ValueError("Index.condition must be a Q instance.")
 33        if condition and not name:
 34            raise ValueError("An index must be named to use condition.")
 35        if not isinstance(fields, list | tuple):
 36            raise ValueError("Index.fields must be a list or tuple.")
 37        if not isinstance(opclasses, list | tuple):
 38            raise ValueError("Index.opclasses must be a list or tuple.")
 39        if not expressions and not fields:
 40            raise ValueError(
 41                "At least one field or expression is required to define an index."
 42            )
 43        if expressions and fields:
 44            raise ValueError(
 45                "Index.fields and expressions are mutually exclusive.",
 46            )
 47        if expressions and not name:
 48            raise ValueError("An index must be named to use expressions.")
 49        if expressions and opclasses:
 50            raise ValueError(
 51                "Index.opclasses cannot be used with expressions. Use "
 52                "a custom OpClass() instead."
 53            )
 54        if opclasses and len(fields) != len(opclasses):
 55            raise ValueError(
 56                "Index.fields and Index.opclasses must have the same number of "
 57                "elements."
 58            )
 59        if fields and not all(isinstance(field, str) for field in fields):
 60            raise ValueError("Index.fields must contain only strings with field names.")
 61        if include and not name:
 62            raise ValueError("A covering index must be named.")
 63        if not isinstance(include, NoneType | list | tuple):
 64            raise ValueError("Index.include must be a list or tuple.")
 65        self.fields = list(fields)
 66        # A list of 2-tuple with the field name and ordering ('' or 'DESC').
 67        self.fields_orders = [
 68            (field_name.removeprefix("-"), "DESC" if field_name.startswith("-") else "")
 69            for field_name in self.fields
 70        ]
 71        self.name = name or ""
 72        self.db_tablespace = db_tablespace
 73        self.opclasses = opclasses
 74        self.condition = condition
 75        self.include = tuple(include) if include else ()
 76        self.expressions = tuple(
 77            F(expression) if isinstance(expression, str) else expression
 78            for expression in expressions
 79        )
 80
 81    @property
 82    def contains_expressions(self):
 83        return bool(self.expressions)
 84
 85    def _get_condition_sql(self, model, schema_editor):
 86        if self.condition is None:
 87            return None
 88        query = Query(model=model, alias_cols=False)
 89        where = query.build_where(self.condition)
 90        compiler = query.get_compiler(connection=schema_editor.connection)
 91        sql, params = where.as_sql(compiler, schema_editor.connection)
 92        return sql % tuple(schema_editor.quote_value(p) for p in params)
 93
 94    def create_sql(self, model, schema_editor, using="", **kwargs):
 95        include = [
 96            model._meta.get_field(field_name).column for field_name in self.include
 97        ]
 98        condition = self._get_condition_sql(model, schema_editor)
 99        if self.expressions:
100            index_expressions = []
101            for expression in self.expressions:
102                index_expression = IndexExpression(expression)
103                index_expression.set_wrapper_classes(schema_editor.connection)
104                index_expressions.append(index_expression)
105            expressions = ExpressionList(*index_expressions).resolve_expression(
106                Query(model, alias_cols=False),
107            )
108            fields = None
109            col_suffixes = None
110        else:
111            fields = [
112                model._meta.get_field(field_name)
113                for field_name, _ in self.fields_orders
114            ]
115            if schema_editor.connection.features.supports_index_column_ordering:
116                col_suffixes = [order[1] for order in self.fields_orders]
117            else:
118                col_suffixes = [""] * len(self.fields_orders)
119            expressions = None
120        return schema_editor._create_index_sql(
121            model,
122            fields=fields,
123            name=self.name,
124            using=using,
125            db_tablespace=self.db_tablespace,
126            col_suffixes=col_suffixes,
127            opclasses=self.opclasses,
128            condition=condition,
129            include=include,
130            expressions=expressions,
131            **kwargs,
132        )
133
134    def remove_sql(self, model, schema_editor, **kwargs):
135        return schema_editor._delete_index_sql(model, self.name, **kwargs)
136
137    def deconstruct(self):
138        path = f"{self.__class__.__module__}.{self.__class__.__name__}"
139        path = path.replace("plain.models.indexes", "plain.models")
140        kwargs = {"name": self.name}
141        if self.fields:
142            kwargs["fields"] = self.fields
143        if self.db_tablespace is not None:
144            kwargs["db_tablespace"] = self.db_tablespace
145        if self.opclasses:
146            kwargs["opclasses"] = self.opclasses
147        if self.condition:
148            kwargs["condition"] = self.condition
149        if self.include:
150            kwargs["include"] = self.include
151        return (path, self.expressions, kwargs)
152
153    def clone(self):
154        """Create a copy of this Index."""
155        _, args, kwargs = self.deconstruct()
156        return self.__class__(*args, **kwargs)
157
158    def set_name_with_model(self, model):
159        """
160        Generate a unique name for the index.
161
162        The name is divided into 3 parts - table name (12 chars), field name
163        (8 chars) and unique hash + suffix (10 chars). Each part is made to
164        fit its size by truncating the excess length.
165        """
166        _, table_name = split_identifier(model._meta.db_table)
167        column_names = [
168            model._meta.get_field(field_name).column
169            for field_name, order in self.fields_orders
170        ]
171        column_names_with_order = [
172            (("-%s" if order else "%s") % column_name)
173            for column_name, (field_name, order) in zip(
174                column_names, self.fields_orders
175            )
176        ]
177        # The length of the parts of the name is based on the default max
178        # length of 30 characters.
179        hash_data = [table_name] + column_names_with_order + [self.suffix]
180        self.name = "{}_{}_{}".format(
181            table_name[:11],
182            column_names[0][:7],
183            f"{names_digest(*hash_data, length=6)}_{self.suffix}",
184        )
185        if len(self.name) > self.max_name_length:
186            raise ValueError(
187                "Index too long for multiple database support. Is self.suffix "
188                "longer than 3 characters?"
189            )
190        if self.name[0] == "_" or self.name[0].isdigit():
191            self.name = "D%s" % self.name[1:]
192
193    def __repr__(self):
194        return "<{}:{}{}{}{}{}{}{}>".format(
195            self.__class__.__qualname__,
196            "" if not self.fields else " fields=%s" % repr(self.fields),
197            "" if not self.expressions else " expressions=%s" % repr(self.expressions),
198            "" if not self.name else " name=%s" % repr(self.name),
199            ""
200            if self.db_tablespace is None
201            else " db_tablespace=%s" % repr(self.db_tablespace),
202            "" if self.condition is None else " condition=%s" % self.condition,
203            "" if not self.include else " include=%s" % repr(self.include),
204            "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses),
205        )
206
207    def __eq__(self, other):
208        if self.__class__ == other.__class__:
209            return self.deconstruct() == other.deconstruct()
210        return NotImplemented
211
212
213class IndexExpression(Func):
214    """Order and wrap expressions for CREATE INDEX statements."""
215
216    template = "%(expressions)s"
217    wrapper_classes = (OrderBy, Collate)
218
219    def set_wrapper_classes(self, connection=None):
220        # Some databases (e.g. MySQL) treats COLLATE as an indexed expression.
221        if connection and connection.features.collate_as_index_expression:
222            self.wrapper_classes = tuple(
223                [
224                    wrapper_cls
225                    for wrapper_cls in self.wrapper_classes
226                    if wrapper_cls is not Collate
227                ]
228            )
229
230    @classmethod
231    def register_wrappers(cls, *wrapper_classes):
232        cls.wrapper_classes = wrapper_classes
233
234    def resolve_expression(
235        self,
236        query=None,
237        allow_joins=True,
238        reuse=None,
239        summarize=False,
240        for_save=False,
241    ):
242        expressions = list(self.flatten())
243        # Split expressions and wrappers.
244        index_expressions, wrappers = partition(
245            lambda e: isinstance(e, self.wrapper_classes),
246            expressions,
247        )
248        wrapper_types = [type(wrapper) for wrapper in wrappers]
249        if len(wrapper_types) != len(set(wrapper_types)):
250            raise ValueError(
251                "Multiple references to %s can't be used in an indexed "
252                "expression."
253                % ", ".join(
254                    [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
255                )
256            )
257        if expressions[1 : len(wrappers) + 1] != wrappers:
258            raise ValueError(
259                "%s must be topmost expressions in an indexed expression."
260                % ", ".join(
261                    [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes]
262                )
263            )
264        # Wrap expressions in parentheses if they are not column references.
265        root_expression = index_expressions[1]
266        resolve_root_expression = root_expression.resolve_expression(
267            query,
268            allow_joins,
269            reuse,
270            summarize,
271            for_save,
272        )
273        if not isinstance(resolve_root_expression, Col):
274            root_expression = Func(root_expression, template="(%(expressions)s)")
275
276        if wrappers:
277            # Order wrappers and set their expressions.
278            wrappers = sorted(
279                wrappers,
280                key=lambda w: self.wrapper_classes.index(type(w)),
281            )
282            wrappers = [wrapper.copy() for wrapper in wrappers]
283            for i, wrapper in enumerate(wrappers[:-1]):
284                wrapper.set_source_expressions([wrappers[i + 1]])
285            # Set the root expression on the deepest wrapper.
286            wrappers[-1].set_source_expressions([root_expression])
287            self.set_source_expressions([wrappers[0]])
288        else:
289            # Use the root expression, if there are no wrappers.
290            self.set_source_expressions([root_expression])
291        return super().resolve_expression(
292            query, allow_joins, reuse, summarize, for_save
293        )
294
295    def as_sqlite(self, compiler, connection, **extra_context):
296        # Casting to numeric is unnecessary.
297        return self.as_sql(compiler, connection, **extra_context)