plain.models
Model your data and store it in a database.
# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField
class User(models.Model):
email = models.EmailField(unique=True)
password = PasswordField()
is_staff = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.email
Create, update, and delete instances of your models:
from .models import User
# Create a new user
user = User.objects.create(
email="[email protected]",
password="password",
)
# Update a user
user.email = "[email protected]"
user.save()
# Delete a user
user.delete()
# Query for users
staff_users = User.objects.filter(is_staff=True)
Installation
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.models",
]
To connect to a database, you can provide a DATABASE_URL
environment variable.
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Or you can manually define the DATABASES
setting.
# app/settings.py
DATABASES = {
"default": {
"ENGINE": "plain.models.backends.postgresql",
"NAME": "dbname",
"USER": "user",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5432",
}
}
Multiple backends are supported, including Postgres, MySQL, and SQLite.
Querying
Migrations
Fields
Validation
Indexes and constraints
Managers
Forms
1"""
2Various data structures used in query construction.
3
4Factored out from plain.models.query to avoid making the main module very
5large and/or so that they can be used by other modules without getting into
6circular import difficulties.
7"""
8import functools
9import inspect
10import logging
11from collections import namedtuple
12
13from plain.exceptions import FieldError
14from plain.models.constants import LOOKUP_SEP
15from plain.models.db import DEFAULT_DB_ALIAS, DatabaseError, connections
16from plain.utils import tree
17
18logger = logging.getLogger("plain.models")
19
20# PathInfo is used when converting lookups (fk__somecol). The contents
21# describe the relation in Model terms (model Options and Fields for both
22# sides of the relation. The join_field is the field backing the relation.
23PathInfo = namedtuple(
24 "PathInfo",
25 "from_opts to_opts target_fields join_field m2m direct filtered_relation",
26)
27
28
29def subclasses(cls):
30 yield cls
31 for subclass in cls.__subclasses__():
32 yield from subclasses(subclass)
33
34
35class Q(tree.Node):
36 """
37 Encapsulate filters as objects that can then be combined logically (using
38 `&` and `|`).
39 """
40
41 # Connection types
42 AND = "AND"
43 OR = "OR"
44 XOR = "XOR"
45 default = AND
46 conditional = True
47
48 def __init__(self, *args, _connector=None, _negated=False, **kwargs):
49 super().__init__(
50 children=[*args, *sorted(kwargs.items())],
51 connector=_connector,
52 negated=_negated,
53 )
54
55 def _combine(self, other, conn):
56 if getattr(other, "conditional", False) is False:
57 raise TypeError(other)
58 if not self:
59 return other.copy()
60 if not other and isinstance(other, Q):
61 return self.copy()
62
63 obj = self.create(connector=conn)
64 obj.add(self, conn)
65 obj.add(other, conn)
66 return obj
67
68 def __or__(self, other):
69 return self._combine(other, self.OR)
70
71 def __and__(self, other):
72 return self._combine(other, self.AND)
73
74 def __xor__(self, other):
75 return self._combine(other, self.XOR)
76
77 def __invert__(self):
78 obj = self.copy()
79 obj.negate()
80 return obj
81
82 def resolve_expression(
83 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
84 ):
85 # We must promote any new joins to left outer joins so that when Q is
86 # used as an expression, rows aren't filtered due to joins.
87 clause, joins = query._add_q(
88 self,
89 reuse,
90 allow_joins=allow_joins,
91 split_subq=False,
92 check_filterable=False,
93 summarize=summarize,
94 )
95 query.promote_joins(joins)
96 return clause
97
98 def flatten(self):
99 """
100 Recursively yield this Q object and all subexpressions, in depth-first
101 order.
102 """
103 yield self
104 for child in self.children:
105 if isinstance(child, tuple):
106 # Use the lookup.
107 child = child[1]
108 if hasattr(child, "flatten"):
109 yield from child.flatten()
110 else:
111 yield child
112
113 def check(self, against, using=DEFAULT_DB_ALIAS):
114 """
115 Do a database query to check if the expressions of the Q instance
116 matches against the expressions.
117 """
118 # Avoid circular imports.
119 from plain.models.expressions import Value
120 from plain.models.fields import BooleanField
121 from plain.models.functions import Coalesce
122 from plain.models.sql import Query
123 from plain.models.sql.constants import SINGLE
124
125 query = Query(None)
126 for name, value in against.items():
127 if not hasattr(value, "resolve_expression"):
128 value = Value(value)
129 query.add_annotation(value, name, select=False)
130 query.add_annotation(Value(1), "_check")
131 # This will raise a FieldError if a field is missing in "against".
132 if connections[using].features.supports_comparing_boolean_expr:
133 query.add_q(Q(Coalesce(self, True, output_field=BooleanField())))
134 else:
135 query.add_q(self)
136 compiler = query.get_compiler(using=using)
137 try:
138 return compiler.execute_sql(SINGLE) is not None
139 except DatabaseError as e:
140 logger.warning("Got a database error calling check() on %r: %s", self, e)
141 return True
142
143 def deconstruct(self):
144 path = f"{self.__class__.__module__}.{self.__class__.__name__}"
145 if path.startswith("plain.models.query_utils"):
146 path = path.replace("plain.models.query_utils", "plain.models")
147 args = tuple(self.children)
148 kwargs = {}
149 if self.connector != self.default:
150 kwargs["_connector"] = self.connector
151 if self.negated:
152 kwargs["_negated"] = True
153 return path, args, kwargs
154
155
156class DeferredAttribute:
157 """
158 A wrapper for a deferred-loading field. When the value is read from this
159 object the first time, the query is executed.
160 """
161
162 def __init__(self, field):
163 self.field = field
164
165 def __get__(self, instance, cls=None):
166 """
167 Retrieve and caches the value from the datastore on the first lookup.
168 Return the cached value.
169 """
170 if instance is None:
171 return self
172 data = instance.__dict__
173 field_name = self.field.attname
174 if field_name not in data:
175 # Let's see if the field is part of the parent chain. If so we
176 # might be able to reuse the already loaded value. Refs #18343.
177 val = self._check_parent_chain(instance)
178 if val is None:
179 instance.refresh_from_db(fields=[field_name])
180 else:
181 data[field_name] = val
182 return data[field_name]
183
184 def _check_parent_chain(self, instance):
185 """
186 Check if the field value can be fetched from a parent field already
187 loaded in the instance. This can be done if the to-be fetched
188 field is a primary key field.
189 """
190 opts = instance._meta
191 link_field = opts.get_ancestor_link(self.field.model)
192 if self.field.primary_key and self.field != link_field:
193 return getattr(instance, link_field.attname)
194 return None
195
196
197class class_or_instance_method:
198 """
199 Hook used in RegisterLookupMixin to return partial functions depending on
200 the caller type (instance or class of models.Field).
201 """
202
203 def __init__(self, class_method, instance_method):
204 self.class_method = class_method
205 self.instance_method = instance_method
206
207 def __get__(self, instance, owner):
208 if instance is None:
209 return functools.partial(self.class_method, owner)
210 return functools.partial(self.instance_method, instance)
211
212
213class RegisterLookupMixin:
214 def _get_lookup(self, lookup_name):
215 return self.get_lookups().get(lookup_name, None)
216
217 @functools.cache
218 def get_class_lookups(cls):
219 class_lookups = [
220 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
221 ]
222 return cls.merge_dicts(class_lookups)
223
224 def get_instance_lookups(self):
225 class_lookups = self.get_class_lookups()
226 if instance_lookups := getattr(self, "instance_lookups", None):
227 return {**class_lookups, **instance_lookups}
228 return class_lookups
229
230 get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups)
231 get_class_lookups = classmethod(get_class_lookups)
232
233 def get_lookup(self, lookup_name):
234 from plain.models.lookups import Lookup
235
236 found = self._get_lookup(lookup_name)
237 if found is None and hasattr(self, "output_field"):
238 return self.output_field.get_lookup(lookup_name)
239 if found is not None and not issubclass(found, Lookup):
240 return None
241 return found
242
243 def get_transform(self, lookup_name):
244 from plain.models.lookups import Transform
245
246 found = self._get_lookup(lookup_name)
247 if found is None and hasattr(self, "output_field"):
248 return self.output_field.get_transform(lookup_name)
249 if found is not None and not issubclass(found, Transform):
250 return None
251 return found
252
253 @staticmethod
254 def merge_dicts(dicts):
255 """
256 Merge dicts in reverse to preference the order of the original list. e.g.,
257 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
258 """
259 merged = {}
260 for d in reversed(dicts):
261 merged.update(d)
262 return merged
263
264 @classmethod
265 def _clear_cached_class_lookups(cls):
266 for subclass in subclasses(cls):
267 subclass.get_class_lookups.cache_clear()
268
269 def register_class_lookup(cls, lookup, lookup_name=None):
270 if lookup_name is None:
271 lookup_name = lookup.lookup_name
272 if "class_lookups" not in cls.__dict__:
273 cls.class_lookups = {}
274 cls.class_lookups[lookup_name] = lookup
275 cls._clear_cached_class_lookups()
276 return lookup
277
278 def register_instance_lookup(self, lookup, lookup_name=None):
279 if lookup_name is None:
280 lookup_name = lookup.lookup_name
281 if "instance_lookups" not in self.__dict__:
282 self.instance_lookups = {}
283 self.instance_lookups[lookup_name] = lookup
284 return lookup
285
286 register_lookup = class_or_instance_method(
287 register_class_lookup, register_instance_lookup
288 )
289 register_class_lookup = classmethod(register_class_lookup)
290
291 def _unregister_class_lookup(cls, lookup, lookup_name=None):
292 """
293 Remove given lookup from cls lookups. For use in tests only as it's
294 not thread-safe.
295 """
296 if lookup_name is None:
297 lookup_name = lookup.lookup_name
298 del cls.class_lookups[lookup_name]
299 cls._clear_cached_class_lookups()
300
301 def _unregister_instance_lookup(self, lookup, lookup_name=None):
302 """
303 Remove given lookup from instance lookups. For use in tests only as
304 it's not thread-safe.
305 """
306 if lookup_name is None:
307 lookup_name = lookup.lookup_name
308 del self.instance_lookups[lookup_name]
309
310 _unregister_lookup = class_or_instance_method(
311 _unregister_class_lookup, _unregister_instance_lookup
312 )
313 _unregister_class_lookup = classmethod(_unregister_class_lookup)
314
315
316def select_related_descend(field, restricted, requested, select_mask, reverse=False):
317 """
318 Return True if this field should be used to descend deeper for
319 select_related() purposes. Used by both the query construction code
320 (compiler.get_related_selections()) and the model instance creation code
321 (compiler.klass_info).
322
323 Arguments:
324 * field - the field to be checked
325 * restricted - a boolean field, indicating if the field list has been
326 manually restricted using a requested clause)
327 * requested - The select_related() dictionary.
328 * select_mask - the dictionary of selected fields.
329 * reverse - boolean, True if we are checking a reverse select related
330 """
331 if not field.remote_field:
332 return False
333 if field.remote_field.parent_link and not reverse:
334 return False
335 if restricted:
336 if reverse and field.related_query_name() not in requested:
337 return False
338 if not reverse and field.name not in requested:
339 return False
340 if not restricted and field.null:
341 return False
342 if (
343 restricted
344 and select_mask
345 and field.name in requested
346 and field not in select_mask
347 ):
348 raise FieldError(
349 f"Field {field.model._meta.object_name}.{field.name} cannot be both "
350 "deferred and traversed using select_related at the same time."
351 )
352 return True
353
354
355def refs_expression(lookup_parts, annotations):
356 """
357 Check if the lookup_parts contains references to the given annotations set.
358 Because the LOOKUP_SEP is contained in the default annotation names, check
359 each prefix of the lookup_parts for a match.
360 """
361 for n in range(1, len(lookup_parts) + 1):
362 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
363 if annotations.get(level_n_lookup):
364 return level_n_lookup, lookup_parts[n:]
365 return None, ()
366
367
368def check_rel_lookup_compatibility(model, target_opts, field):
369 """
370 Check that self.model is compatible with target_opts. Compatibility
371 is OK if:
372 1) model and opts match (where proxy inheritance is removed)
373 2) model is parent of opts' model or the other way around
374 """
375
376 def check(opts):
377 return (
378 model._meta.concrete_model == opts.concrete_model
379 or opts.concrete_model in model._meta.get_parent_list()
380 or model in opts.get_parent_list()
381 )
382
383 # If the field is a primary key, then doing a query against the field's
384 # model is ok, too. Consider the case:
385 # class Restaurant(models.Model):
386 # place = OneToOneField(Place, primary_key=True):
387 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
388 # If we didn't have the primary key check, then pk__in (== place__in) would
389 # give Place's opts as the target opts, but Restaurant isn't compatible
390 # with that. This logic applies only to primary keys, as when doing __in=qs,
391 # we are going to turn this into __in=qs.values('pk') later on.
392 return check(target_opts) or (
393 getattr(field, "primary_key", False) and check(field.model._meta)
394 )
395
396
397class FilteredRelation:
398 """Specify custom filtering in the ON clause of SQL joins."""
399
400 def __init__(self, relation_name, *, condition=Q()):
401 if not relation_name:
402 raise ValueError("relation_name cannot be empty.")
403 self.relation_name = relation_name
404 self.alias = None
405 if not isinstance(condition, Q):
406 raise ValueError("condition argument must be a Q() instance.")
407 self.condition = condition
408 self.path = []
409
410 def __eq__(self, other):
411 if not isinstance(other, self.__class__):
412 return NotImplemented
413 return (
414 self.relation_name == other.relation_name
415 and self.alias == other.alias
416 and self.condition == other.condition
417 )
418
419 def clone(self):
420 clone = FilteredRelation(self.relation_name, condition=self.condition)
421 clone.alias = self.alias
422 clone.path = self.path[:]
423 return clone
424
425 def resolve_expression(self, *args, **kwargs):
426 """
427 QuerySet.annotate() only accepts expression-like arguments
428 (with a resolve_expression() method).
429 """
430 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
431
432 def as_sql(self, compiler, connection):
433 # Resolve the condition in Join.filtered_relation.
434 query = compiler.query
435 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
436 return compiler.compile(where)