Plain is headed towards 1.0! Subscribe for development updates →

plain.models

Model your data and store it in a database.

# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField


class User(models.Model):
    email = models.EmailField(unique=True)
    password = PasswordField()
    is_staff = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.email

Create, update, and delete instances of your models:

from .models import User


# Create a new user
user = User.objects.create(
    email="[email protected]",
    password="password",
)

# Update a user
user.email = "[email protected]"
user.save()

# Delete a user
user.delete()

# Query for users
staff_users = User.objects.filter(is_staff=True)

Installation

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.models",
]

To connect to a database, you can provide a DATABASE_URL environment variable.

DATABASE_URL=postgresql://user:password@localhost:5432/dbname

Or you can manually define the DATABASES setting.

# app/settings.py
DATABASES = {
    "default": {
        "ENGINE": "plain.models.backends.postgresql",
        "NAME": "dbname",
        "USER": "user",
        "PASSWORD": "password",
        "HOST": "localhost",
        "PORT": "5432",
    }
}

Multiple backends are supported, including Postgres, MySQL, and SQLite.

Querying

Migrations

Migration docs

Fields

Field docs

Validation

Indexes and constraints

Managers

Forms

  1"""
  2Various data structures used in query construction.
  3
  4Factored out from plain.models.query to avoid making the main module very
  5large and/or so that they can be used by other modules without getting into
  6circular import difficulties.
  7"""
  8import functools
  9import inspect
 10import logging
 11from collections import namedtuple
 12
 13from plain.exceptions import FieldError
 14from plain.models.constants import LOOKUP_SEP
 15from plain.models.db import DEFAULT_DB_ALIAS, DatabaseError, connections
 16from plain.utils import tree
 17
 18logger = logging.getLogger("plain.models")
 19
 20# PathInfo is used when converting lookups (fk__somecol). The contents
 21# describe the relation in Model terms (model Options and Fields for both
 22# sides of the relation. The join_field is the field backing the relation.
 23PathInfo = namedtuple(
 24    "PathInfo",
 25    "from_opts to_opts target_fields join_field m2m direct filtered_relation",
 26)
 27
 28
 29def subclasses(cls):
 30    yield cls
 31    for subclass in cls.__subclasses__():
 32        yield from subclasses(subclass)
 33
 34
 35class Q(tree.Node):
 36    """
 37    Encapsulate filters as objects that can then be combined logically (using
 38    `&` and `|`).
 39    """
 40
 41    # Connection types
 42    AND = "AND"
 43    OR = "OR"
 44    XOR = "XOR"
 45    default = AND
 46    conditional = True
 47
 48    def __init__(self, *args, _connector=None, _negated=False, **kwargs):
 49        super().__init__(
 50            children=[*args, *sorted(kwargs.items())],
 51            connector=_connector,
 52            negated=_negated,
 53        )
 54
 55    def _combine(self, other, conn):
 56        if getattr(other, "conditional", False) is False:
 57            raise TypeError(other)
 58        if not self:
 59            return other.copy()
 60        if not other and isinstance(other, Q):
 61            return self.copy()
 62
 63        obj = self.create(connector=conn)
 64        obj.add(self, conn)
 65        obj.add(other, conn)
 66        return obj
 67
 68    def __or__(self, other):
 69        return self._combine(other, self.OR)
 70
 71    def __and__(self, other):
 72        return self._combine(other, self.AND)
 73
 74    def __xor__(self, other):
 75        return self._combine(other, self.XOR)
 76
 77    def __invert__(self):
 78        obj = self.copy()
 79        obj.negate()
 80        return obj
 81
 82    def resolve_expression(
 83        self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
 84    ):
 85        # We must promote any new joins to left outer joins so that when Q is
 86        # used as an expression, rows aren't filtered due to joins.
 87        clause, joins = query._add_q(
 88            self,
 89            reuse,
 90            allow_joins=allow_joins,
 91            split_subq=False,
 92            check_filterable=False,
 93            summarize=summarize,
 94        )
 95        query.promote_joins(joins)
 96        return clause
 97
 98    def flatten(self):
 99        """
100        Recursively yield this Q object and all subexpressions, in depth-first
101        order.
102        """
103        yield self
104        for child in self.children:
105            if isinstance(child, tuple):
106                # Use the lookup.
107                child = child[1]
108            if hasattr(child, "flatten"):
109                yield from child.flatten()
110            else:
111                yield child
112
113    def check(self, against, using=DEFAULT_DB_ALIAS):
114        """
115        Do a database query to check if the expressions of the Q instance
116        matches against the expressions.
117        """
118        # Avoid circular imports.
119        from plain.models.expressions import Value
120        from plain.models.fields import BooleanField
121        from plain.models.functions import Coalesce
122        from plain.models.sql import Query
123        from plain.models.sql.constants import SINGLE
124
125        query = Query(None)
126        for name, value in against.items():
127            if not hasattr(value, "resolve_expression"):
128                value = Value(value)
129            query.add_annotation(value, name, select=False)
130        query.add_annotation(Value(1), "_check")
131        # This will raise a FieldError if a field is missing in "against".
132        if connections[using].features.supports_comparing_boolean_expr:
133            query.add_q(Q(Coalesce(self, True, output_field=BooleanField())))
134        else:
135            query.add_q(self)
136        compiler = query.get_compiler(using=using)
137        try:
138            return compiler.execute_sql(SINGLE) is not None
139        except DatabaseError as e:
140            logger.warning("Got a database error calling check() on %r: %s", self, e)
141            return True
142
143    def deconstruct(self):
144        path = f"{self.__class__.__module__}.{self.__class__.__name__}"
145        if path.startswith("plain.models.query_utils"):
146            path = path.replace("plain.models.query_utils", "plain.models")
147        args = tuple(self.children)
148        kwargs = {}
149        if self.connector != self.default:
150            kwargs["_connector"] = self.connector
151        if self.negated:
152            kwargs["_negated"] = True
153        return path, args, kwargs
154
155
156class DeferredAttribute:
157    """
158    A wrapper for a deferred-loading field. When the value is read from this
159    object the first time, the query is executed.
160    """
161
162    def __init__(self, field):
163        self.field = field
164
165    def __get__(self, instance, cls=None):
166        """
167        Retrieve and caches the value from the datastore on the first lookup.
168        Return the cached value.
169        """
170        if instance is None:
171            return self
172        data = instance.__dict__
173        field_name = self.field.attname
174        if field_name not in data:
175            # Let's see if the field is part of the parent chain. If so we
176            # might be able to reuse the already loaded value. Refs #18343.
177            val = self._check_parent_chain(instance)
178            if val is None:
179                instance.refresh_from_db(fields=[field_name])
180            else:
181                data[field_name] = val
182        return data[field_name]
183
184    def _check_parent_chain(self, instance):
185        """
186        Check if the field value can be fetched from a parent field already
187        loaded in the instance. This can be done if the to-be fetched
188        field is a primary key field.
189        """
190        opts = instance._meta
191        link_field = opts.get_ancestor_link(self.field.model)
192        if self.field.primary_key and self.field != link_field:
193            return getattr(instance, link_field.attname)
194        return None
195
196
197class class_or_instance_method:
198    """
199    Hook used in RegisterLookupMixin to return partial functions depending on
200    the caller type (instance or class of models.Field).
201    """
202
203    def __init__(self, class_method, instance_method):
204        self.class_method = class_method
205        self.instance_method = instance_method
206
207    def __get__(self, instance, owner):
208        if instance is None:
209            return functools.partial(self.class_method, owner)
210        return functools.partial(self.instance_method, instance)
211
212
213class RegisterLookupMixin:
214    def _get_lookup(self, lookup_name):
215        return self.get_lookups().get(lookup_name, None)
216
217    @functools.cache
218    def get_class_lookups(cls):
219        class_lookups = [
220            parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
221        ]
222        return cls.merge_dicts(class_lookups)
223
224    def get_instance_lookups(self):
225        class_lookups = self.get_class_lookups()
226        if instance_lookups := getattr(self, "instance_lookups", None):
227            return {**class_lookups, **instance_lookups}
228        return class_lookups
229
230    get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups)
231    get_class_lookups = classmethod(get_class_lookups)
232
233    def get_lookup(self, lookup_name):
234        from plain.models.lookups import Lookup
235
236        found = self._get_lookup(lookup_name)
237        if found is None and hasattr(self, "output_field"):
238            return self.output_field.get_lookup(lookup_name)
239        if found is not None and not issubclass(found, Lookup):
240            return None
241        return found
242
243    def get_transform(self, lookup_name):
244        from plain.models.lookups import Transform
245
246        found = self._get_lookup(lookup_name)
247        if found is None and hasattr(self, "output_field"):
248            return self.output_field.get_transform(lookup_name)
249        if found is not None and not issubclass(found, Transform):
250            return None
251        return found
252
253    @staticmethod
254    def merge_dicts(dicts):
255        """
256        Merge dicts in reverse to preference the order of the original list. e.g.,
257        merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
258        """
259        merged = {}
260        for d in reversed(dicts):
261            merged.update(d)
262        return merged
263
264    @classmethod
265    def _clear_cached_class_lookups(cls):
266        for subclass in subclasses(cls):
267            subclass.get_class_lookups.cache_clear()
268
269    def register_class_lookup(cls, lookup, lookup_name=None):
270        if lookup_name is None:
271            lookup_name = lookup.lookup_name
272        if "class_lookups" not in cls.__dict__:
273            cls.class_lookups = {}
274        cls.class_lookups[lookup_name] = lookup
275        cls._clear_cached_class_lookups()
276        return lookup
277
278    def register_instance_lookup(self, lookup, lookup_name=None):
279        if lookup_name is None:
280            lookup_name = lookup.lookup_name
281        if "instance_lookups" not in self.__dict__:
282            self.instance_lookups = {}
283        self.instance_lookups[lookup_name] = lookup
284        return lookup
285
286    register_lookup = class_or_instance_method(
287        register_class_lookup, register_instance_lookup
288    )
289    register_class_lookup = classmethod(register_class_lookup)
290
291    def _unregister_class_lookup(cls, lookup, lookup_name=None):
292        """
293        Remove given lookup from cls lookups. For use in tests only as it's
294        not thread-safe.
295        """
296        if lookup_name is None:
297            lookup_name = lookup.lookup_name
298        del cls.class_lookups[lookup_name]
299        cls._clear_cached_class_lookups()
300
301    def _unregister_instance_lookup(self, lookup, lookup_name=None):
302        """
303        Remove given lookup from instance lookups. For use in tests only as
304        it's not thread-safe.
305        """
306        if lookup_name is None:
307            lookup_name = lookup.lookup_name
308        del self.instance_lookups[lookup_name]
309
310    _unregister_lookup = class_or_instance_method(
311        _unregister_class_lookup, _unregister_instance_lookup
312    )
313    _unregister_class_lookup = classmethod(_unregister_class_lookup)
314
315
316def select_related_descend(field, restricted, requested, select_mask, reverse=False):
317    """
318    Return True if this field should be used to descend deeper for
319    select_related() purposes. Used by both the query construction code
320    (compiler.get_related_selections()) and the model instance creation code
321    (compiler.klass_info).
322
323    Arguments:
324     * field - the field to be checked
325     * restricted - a boolean field, indicating if the field list has been
326       manually restricted using a requested clause)
327     * requested - The select_related() dictionary.
328     * select_mask - the dictionary of selected fields.
329     * reverse - boolean, True if we are checking a reverse select related
330    """
331    if not field.remote_field:
332        return False
333    if field.remote_field.parent_link and not reverse:
334        return False
335    if restricted:
336        if reverse and field.related_query_name() not in requested:
337            return False
338        if not reverse and field.name not in requested:
339            return False
340    if not restricted and field.null:
341        return False
342    if (
343        restricted
344        and select_mask
345        and field.name in requested
346        and field not in select_mask
347    ):
348        raise FieldError(
349            f"Field {field.model._meta.object_name}.{field.name} cannot be both "
350            "deferred and traversed using select_related at the same time."
351        )
352    return True
353
354
355def refs_expression(lookup_parts, annotations):
356    """
357    Check if the lookup_parts contains references to the given annotations set.
358    Because the LOOKUP_SEP is contained in the default annotation names, check
359    each prefix of the lookup_parts for a match.
360    """
361    for n in range(1, len(lookup_parts) + 1):
362        level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
363        if annotations.get(level_n_lookup):
364            return level_n_lookup, lookup_parts[n:]
365    return None, ()
366
367
368def check_rel_lookup_compatibility(model, target_opts, field):
369    """
370    Check that self.model is compatible with target_opts. Compatibility
371    is OK if:
372      1) model and opts match (where proxy inheritance is removed)
373      2) model is parent of opts' model or the other way around
374    """
375
376    def check(opts):
377        return (
378            model._meta.concrete_model == opts.concrete_model
379            or opts.concrete_model in model._meta.get_parent_list()
380            or model in opts.get_parent_list()
381        )
382
383    # If the field is a primary key, then doing a query against the field's
384    # model is ok, too. Consider the case:
385    # class Restaurant(models.Model):
386    #     place = OneToOneField(Place, primary_key=True):
387    # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
388    # If we didn't have the primary key check, then pk__in (== place__in) would
389    # give Place's opts as the target opts, but Restaurant isn't compatible
390    # with that. This logic applies only to primary keys, as when doing __in=qs,
391    # we are going to turn this into __in=qs.values('pk') later on.
392    return check(target_opts) or (
393        getattr(field, "primary_key", False) and check(field.model._meta)
394    )
395
396
397class FilteredRelation:
398    """Specify custom filtering in the ON clause of SQL joins."""
399
400    def __init__(self, relation_name, *, condition=Q()):
401        if not relation_name:
402            raise ValueError("relation_name cannot be empty.")
403        self.relation_name = relation_name
404        self.alias = None
405        if not isinstance(condition, Q):
406            raise ValueError("condition argument must be a Q() instance.")
407        self.condition = condition
408        self.path = []
409
410    def __eq__(self, other):
411        if not isinstance(other, self.__class__):
412            return NotImplemented
413        return (
414            self.relation_name == other.relation_name
415            and self.alias == other.alias
416            and self.condition == other.condition
417        )
418
419    def clone(self):
420        clone = FilteredRelation(self.relation_name, condition=self.condition)
421        clone.alias = self.alias
422        clone.path = self.path[:]
423        return clone
424
425    def resolve_expression(self, *args, **kwargs):
426        """
427        QuerySet.annotate() only accepts expression-like arguments
428        (with a resolve_expression() method).
429        """
430        raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
431
432    def as_sql(self, compiler, connection):
433        # Resolve the condition in Join.filtered_relation.
434        query = compiler.query
435        where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
436        return compiler.compile(where)