Plain is headed towards 1.0! Subscribe for development updates →

plain.models

Model your data and store it in a database.

# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField


class User(models.Model):
    email = models.EmailField()
    password = PasswordField()
    is_admin = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.email

Create, update, and delete instances of your models:

from .models import User


# Create a new user
user = User.objects.create(
    email="[email protected]",
    password="password",
)

# Update a user
user.email = "[email protected]"
user.save()

# Delete a user
user.delete()

# Query for users
admin_users = User.objects.filter(is_admin=True)

Installation

# app/settings.py
INSTALLED_PACKAGES = [
    ...
    "plain.models",
]

To connect to a database, you can provide a DATABASE_URL environment variable.

DATABASE_URL=postgresql://user:password@localhost:5432/dbname

Or you can manually define the DATABASES setting.

# app/settings.py
DATABASES = {
    "default": {
        "ENGINE": "plain.models.backends.postgresql",
        "NAME": "dbname",
        "USER": "user",
        "PASSWORD": "password",
        "HOST": "localhost",
        "PORT": "5432",
    }
}

Multiple backends are supported, including Postgres, MySQL, and SQLite.

Querying

Migrations

Migration docs

Fields

Field docs

Validation

Indexes and constraints

Managers

Forms

  1"""
  2Various data structures used in query construction.
  3
  4Factored out from plain.models.query to avoid making the main module very
  5large and/or so that they can be used by other modules without getting into
  6circular import difficulties.
  7"""
  8
  9import functools
 10import inspect
 11import logging
 12from collections import namedtuple
 13
 14from plain.exceptions import FieldError
 15from plain.models.constants import LOOKUP_SEP
 16from plain.models.db import DEFAULT_DB_ALIAS, DatabaseError, connections
 17from plain.utils import tree
 18
 19logger = logging.getLogger("plain.models")
 20
 21# PathInfo is used when converting lookups (fk__somecol). The contents
 22# describe the relation in Model terms (model Options and Fields for both
 23# sides of the relation. The join_field is the field backing the relation.
 24PathInfo = namedtuple(
 25    "PathInfo",
 26    "from_opts to_opts target_fields join_field m2m direct filtered_relation",
 27)
 28
 29
 30def subclasses(cls):
 31    yield cls
 32    for subclass in cls.__subclasses__():
 33        yield from subclasses(subclass)
 34
 35
 36class Q(tree.Node):
 37    """
 38    Encapsulate filters as objects that can then be combined logically (using
 39    `&` and `|`).
 40    """
 41
 42    # Connection types
 43    AND = "AND"
 44    OR = "OR"
 45    XOR = "XOR"
 46    default = AND
 47    conditional = True
 48
 49    def __init__(self, *args, _connector=None, _negated=False, **kwargs):
 50        super().__init__(
 51            children=[*args, *sorted(kwargs.items())],
 52            connector=_connector,
 53            negated=_negated,
 54        )
 55
 56    def _combine(self, other, conn):
 57        if getattr(other, "conditional", False) is False:
 58            raise TypeError(other)
 59        if not self:
 60            return other.copy()
 61        if not other and isinstance(other, Q):
 62            return self.copy()
 63
 64        obj = self.create(connector=conn)
 65        obj.add(self, conn)
 66        obj.add(other, conn)
 67        return obj
 68
 69    def __or__(self, other):
 70        return self._combine(other, self.OR)
 71
 72    def __and__(self, other):
 73        return self._combine(other, self.AND)
 74
 75    def __xor__(self, other):
 76        return self._combine(other, self.XOR)
 77
 78    def __invert__(self):
 79        obj = self.copy()
 80        obj.negate()
 81        return obj
 82
 83    def resolve_expression(
 84        self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
 85    ):
 86        # We must promote any new joins to left outer joins so that when Q is
 87        # used as an expression, rows aren't filtered due to joins.
 88        clause, joins = query._add_q(
 89            self,
 90            reuse,
 91            allow_joins=allow_joins,
 92            split_subq=False,
 93            check_filterable=False,
 94            summarize=summarize,
 95        )
 96        query.promote_joins(joins)
 97        return clause
 98
 99    def flatten(self):
100        """
101        Recursively yield this Q object and all subexpressions, in depth-first
102        order.
103        """
104        yield self
105        for child in self.children:
106            if isinstance(child, tuple):
107                # Use the lookup.
108                child = child[1]
109            if hasattr(child, "flatten"):
110                yield from child.flatten()
111            else:
112                yield child
113
114    def check(self, against, using=DEFAULT_DB_ALIAS):
115        """
116        Do a database query to check if the expressions of the Q instance
117        matches against the expressions.
118        """
119        # Avoid circular imports.
120        from plain.models.expressions import Value
121        from plain.models.fields import BooleanField
122        from plain.models.functions import Coalesce
123        from plain.models.sql import Query
124        from plain.models.sql.constants import SINGLE
125
126        query = Query(None)
127        for name, value in against.items():
128            if not hasattr(value, "resolve_expression"):
129                value = Value(value)
130            query.add_annotation(value, name, select=False)
131        query.add_annotation(Value(1), "_check")
132        # This will raise a FieldError if a field is missing in "against".
133        if connections[using].features.supports_comparing_boolean_expr:
134            query.add_q(Q(Coalesce(self, True, output_field=BooleanField())))
135        else:
136            query.add_q(self)
137        compiler = query.get_compiler(using=using)
138        try:
139            return compiler.execute_sql(SINGLE) is not None
140        except DatabaseError as e:
141            logger.warning("Got a database error calling check() on %r: %s", self, e)
142            return True
143
144    def deconstruct(self):
145        path = f"{self.__class__.__module__}.{self.__class__.__name__}"
146        if path.startswith("plain.models.query_utils"):
147            path = path.replace("plain.models.query_utils", "plain.models")
148        args = tuple(self.children)
149        kwargs = {}
150        if self.connector != self.default:
151            kwargs["_connector"] = self.connector
152        if self.negated:
153            kwargs["_negated"] = True
154        return path, args, kwargs
155
156
157class DeferredAttribute:
158    """
159    A wrapper for a deferred-loading field. When the value is read from this
160    object the first time, the query is executed.
161    """
162
163    def __init__(self, field):
164        self.field = field
165
166    def __get__(self, instance, cls=None):
167        """
168        Retrieve and caches the value from the datastore on the first lookup.
169        Return the cached value.
170        """
171        if instance is None:
172            return self
173        data = instance.__dict__
174        field_name = self.field.attname
175        if field_name not in data:
176            instance.refresh_from_db(fields=[field_name])
177        return data[field_name]
178
179
180class class_or_instance_method:
181    """
182    Hook used in RegisterLookupMixin to return partial functions depending on
183    the caller type (instance or class of models.Field).
184    """
185
186    def __init__(self, class_method, instance_method):
187        self.class_method = class_method
188        self.instance_method = instance_method
189
190    def __get__(self, instance, owner):
191        if instance is None:
192            return functools.partial(self.class_method, owner)
193        return functools.partial(self.instance_method, instance)
194
195
196class RegisterLookupMixin:
197    def _get_lookup(self, lookup_name):
198        return self.get_lookups().get(lookup_name, None)
199
200    @functools.cache
201    def get_class_lookups(cls):
202        class_lookups = [
203            parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
204        ]
205        return cls.merge_dicts(class_lookups)
206
207    def get_instance_lookups(self):
208        class_lookups = self.get_class_lookups()
209        if instance_lookups := getattr(self, "instance_lookups", None):
210            return {**class_lookups, **instance_lookups}
211        return class_lookups
212
213    get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups)
214    get_class_lookups = classmethod(get_class_lookups)
215
216    def get_lookup(self, lookup_name):
217        from plain.models.lookups import Lookup
218
219        found = self._get_lookup(lookup_name)
220        if found is None and hasattr(self, "output_field"):
221            return self.output_field.get_lookup(lookup_name)
222        if found is not None and not issubclass(found, Lookup):
223            return None
224        return found
225
226    def get_transform(self, lookup_name):
227        from plain.models.lookups import Transform
228
229        found = self._get_lookup(lookup_name)
230        if found is None and hasattr(self, "output_field"):
231            return self.output_field.get_transform(lookup_name)
232        if found is not None and not issubclass(found, Transform):
233            return None
234        return found
235
236    @staticmethod
237    def merge_dicts(dicts):
238        """
239        Merge dicts in reverse to preference the order of the original list. e.g.,
240        merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
241        """
242        merged = {}
243        for d in reversed(dicts):
244            merged.update(d)
245        return merged
246
247    @classmethod
248    def _clear_cached_class_lookups(cls):
249        for subclass in subclasses(cls):
250            subclass.get_class_lookups.cache_clear()
251
252    def register_class_lookup(cls, lookup, lookup_name=None):
253        if lookup_name is None:
254            lookup_name = lookup.lookup_name
255        if "class_lookups" not in cls.__dict__:
256            cls.class_lookups = {}
257        cls.class_lookups[lookup_name] = lookup
258        cls._clear_cached_class_lookups()
259        return lookup
260
261    def register_instance_lookup(self, lookup, lookup_name=None):
262        if lookup_name is None:
263            lookup_name = lookup.lookup_name
264        if "instance_lookups" not in self.__dict__:
265            self.instance_lookups = {}
266        self.instance_lookups[lookup_name] = lookup
267        return lookup
268
269    register_lookup = class_or_instance_method(
270        register_class_lookup, register_instance_lookup
271    )
272    register_class_lookup = classmethod(register_class_lookup)
273
274    def _unregister_class_lookup(cls, lookup, lookup_name=None):
275        """
276        Remove given lookup from cls lookups. For use in tests only as it's
277        not thread-safe.
278        """
279        if lookup_name is None:
280            lookup_name = lookup.lookup_name
281        del cls.class_lookups[lookup_name]
282        cls._clear_cached_class_lookups()
283
284    def _unregister_instance_lookup(self, lookup, lookup_name=None):
285        """
286        Remove given lookup from instance lookups. For use in tests only as
287        it's not thread-safe.
288        """
289        if lookup_name is None:
290            lookup_name = lookup.lookup_name
291        del self.instance_lookups[lookup_name]
292
293    _unregister_lookup = class_or_instance_method(
294        _unregister_class_lookup, _unregister_instance_lookup
295    )
296    _unregister_class_lookup = classmethod(_unregister_class_lookup)
297
298
299def select_related_descend(field, restricted, requested, select_mask, reverse=False):
300    """
301    Return True if this field should be used to descend deeper for
302    select_related() purposes. Used by both the query construction code
303    (compiler.get_related_selections()) and the model instance creation code
304    (compiler.klass_info).
305
306    Arguments:
307     * field - the field to be checked
308     * restricted - a boolean field, indicating if the field list has been
309       manually restricted using a requested clause)
310     * requested - The select_related() dictionary.
311     * select_mask - the dictionary of selected fields.
312     * reverse - boolean, True if we are checking a reverse select related
313    """
314    if not field.remote_field:
315        return False
316    if field.remote_field.parent_link and not reverse:
317        return False
318    if restricted:
319        if reverse and field.related_query_name() not in requested:
320            return False
321        if not reverse and field.name not in requested:
322            return False
323    if not restricted and field.allow_null:
324        return False
325    if (
326        restricted
327        and select_mask
328        and field.name in requested
329        and field not in select_mask
330    ):
331        raise FieldError(
332            f"Field {field.model._meta.object_name}.{field.name} cannot be both "
333            "deferred and traversed using select_related at the same time."
334        )
335    return True
336
337
338def refs_expression(lookup_parts, annotations):
339    """
340    Check if the lookup_parts contains references to the given annotations set.
341    Because the LOOKUP_SEP is contained in the default annotation names, check
342    each prefix of the lookup_parts for a match.
343    """
344    for n in range(1, len(lookup_parts) + 1):
345        level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
346        if annotations.get(level_n_lookup):
347            return level_n_lookup, lookup_parts[n:]
348    return None, ()
349
350
351def check_rel_lookup_compatibility(model, target_opts, field):
352    """
353    Check that self.model is compatible with target_opts. Compatibility
354    is OK if:
355      1) model and opts match (where proxy inheritance is removed)
356      2) model is parent of opts' model or the other way around
357    """
358
359    def check(opts):
360        return model._meta.concrete_model == opts.concrete_model
361
362    # If the field is a primary key, then doing a query against the field's
363    # model is ok, too. Consider the case:
364    # class Restaurant(models.Model):
365    #     place = OneToOneField(Place, primary_key=True):
366    # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
367    # If we didn't have the primary key check, then pk__in (== place__in) would
368    # give Place's opts as the target opts, but Restaurant isn't compatible
369    # with that. This logic applies only to primary keys, as when doing __in=qs,
370    # we are going to turn this into __in=qs.values('pk') later on.
371    return check(target_opts) or (
372        getattr(field, "primary_key", False) and check(field.model._meta)
373    )
374
375
376class FilteredRelation:
377    """Specify custom filtering in the ON clause of SQL joins."""
378
379    def __init__(self, relation_name, *, condition=Q()):
380        if not relation_name:
381            raise ValueError("relation_name cannot be empty.")
382        self.relation_name = relation_name
383        self.alias = None
384        if not isinstance(condition, Q):
385            raise ValueError("condition argument must be a Q() instance.")
386        self.condition = condition
387        self.path = []
388
389    def __eq__(self, other):
390        if not isinstance(other, self.__class__):
391            return NotImplemented
392        return (
393            self.relation_name == other.relation_name
394            and self.alias == other.alias
395            and self.condition == other.condition
396        )
397
398    def clone(self):
399        clone = FilteredRelation(self.relation_name, condition=self.condition)
400        clone.alias = self.alias
401        clone.path = self.path[:]
402        return clone
403
404    def resolve_expression(self, *args, **kwargs):
405        """
406        QuerySet.annotate() only accepts expression-like arguments
407        (with a resolve_expression() method).
408        """
409        raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
410
411    def as_sql(self, compiler, connection):
412        # Resolve the condition in Join.filtered_relation.
413        query = compiler.query
414        where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
415        return compiler.compile(where)