plain.models
Model your data and store it in a database.
# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField
class User(models.Model):
email = models.EmailField()
password = PasswordField()
is_admin = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.email
Create, update, and delete instances of your models:
from .models import User
# Create a new user
user = User.objects.create(
email="[email protected]",
password="password",
)
# Update a user
user.email = "[email protected]"
user.save()
# Delete a user
user.delete()
# Query for users
admin_users = User.objects.filter(is_admin=True)
Installation
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.models",
]
To connect to a database, you can provide a DATABASE_URL
environment variable.
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Or you can manually define the DATABASES
setting.
# app/settings.py
DATABASES = {
"default": {
"ENGINE": "plain.models.backends.postgresql",
"NAME": "dbname",
"USER": "user",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5432",
}
}
Multiple backends are supported, including Postgres, MySQL, and SQLite.
Querying
Migrations
Fields
Validation
Indexes and constraints
Managers
Forms
1"""
2Various data structures used in query construction.
3
4Factored out from plain.models.query to avoid making the main module very
5large and/or so that they can be used by other modules without getting into
6circular import difficulties.
7"""
8
9import functools
10import inspect
11import logging
12from collections import namedtuple
13
14from plain.exceptions import FieldError
15from plain.models.constants import LOOKUP_SEP
16from plain.models.db import DEFAULT_DB_ALIAS, DatabaseError, connections
17from plain.utils import tree
18
19logger = logging.getLogger("plain.models")
20
21# PathInfo is used when converting lookups (fk__somecol). The contents
22# describe the relation in Model terms (model Options and Fields for both
23# sides of the relation. The join_field is the field backing the relation.
24PathInfo = namedtuple(
25 "PathInfo",
26 "from_opts to_opts target_fields join_field m2m direct filtered_relation",
27)
28
29
30def subclasses(cls):
31 yield cls
32 for subclass in cls.__subclasses__():
33 yield from subclasses(subclass)
34
35
36class Q(tree.Node):
37 """
38 Encapsulate filters as objects that can then be combined logically (using
39 `&` and `|`).
40 """
41
42 # Connection types
43 AND = "AND"
44 OR = "OR"
45 XOR = "XOR"
46 default = AND
47 conditional = True
48
49 def __init__(self, *args, _connector=None, _negated=False, **kwargs):
50 super().__init__(
51 children=[*args, *sorted(kwargs.items())],
52 connector=_connector,
53 negated=_negated,
54 )
55
56 def _combine(self, other, conn):
57 if getattr(other, "conditional", False) is False:
58 raise TypeError(other)
59 if not self:
60 return other.copy()
61 if not other and isinstance(other, Q):
62 return self.copy()
63
64 obj = self.create(connector=conn)
65 obj.add(self, conn)
66 obj.add(other, conn)
67 return obj
68
69 def __or__(self, other):
70 return self._combine(other, self.OR)
71
72 def __and__(self, other):
73 return self._combine(other, self.AND)
74
75 def __xor__(self, other):
76 return self._combine(other, self.XOR)
77
78 def __invert__(self):
79 obj = self.copy()
80 obj.negate()
81 return obj
82
83 def resolve_expression(
84 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
85 ):
86 # We must promote any new joins to left outer joins so that when Q is
87 # used as an expression, rows aren't filtered due to joins.
88 clause, joins = query._add_q(
89 self,
90 reuse,
91 allow_joins=allow_joins,
92 split_subq=False,
93 check_filterable=False,
94 summarize=summarize,
95 )
96 query.promote_joins(joins)
97 return clause
98
99 def flatten(self):
100 """
101 Recursively yield this Q object and all subexpressions, in depth-first
102 order.
103 """
104 yield self
105 for child in self.children:
106 if isinstance(child, tuple):
107 # Use the lookup.
108 child = child[1]
109 if hasattr(child, "flatten"):
110 yield from child.flatten()
111 else:
112 yield child
113
114 def check(self, against, using=DEFAULT_DB_ALIAS):
115 """
116 Do a database query to check if the expressions of the Q instance
117 matches against the expressions.
118 """
119 # Avoid circular imports.
120 from plain.models.expressions import Value
121 from plain.models.fields import BooleanField
122 from plain.models.functions import Coalesce
123 from plain.models.sql import Query
124 from plain.models.sql.constants import SINGLE
125
126 query = Query(None)
127 for name, value in against.items():
128 if not hasattr(value, "resolve_expression"):
129 value = Value(value)
130 query.add_annotation(value, name, select=False)
131 query.add_annotation(Value(1), "_check")
132 # This will raise a FieldError if a field is missing in "against".
133 if connections[using].features.supports_comparing_boolean_expr:
134 query.add_q(Q(Coalesce(self, True, output_field=BooleanField())))
135 else:
136 query.add_q(self)
137 compiler = query.get_compiler(using=using)
138 try:
139 return compiler.execute_sql(SINGLE) is not None
140 except DatabaseError as e:
141 logger.warning("Got a database error calling check() on %r: %s", self, e)
142 return True
143
144 def deconstruct(self):
145 path = f"{self.__class__.__module__}.{self.__class__.__name__}"
146 if path.startswith("plain.models.query_utils"):
147 path = path.replace("plain.models.query_utils", "plain.models")
148 args = tuple(self.children)
149 kwargs = {}
150 if self.connector != self.default:
151 kwargs["_connector"] = self.connector
152 if self.negated:
153 kwargs["_negated"] = True
154 return path, args, kwargs
155
156
157class DeferredAttribute:
158 """
159 A wrapper for a deferred-loading field. When the value is read from this
160 object the first time, the query is executed.
161 """
162
163 def __init__(self, field):
164 self.field = field
165
166 def __get__(self, instance, cls=None):
167 """
168 Retrieve and caches the value from the datastore on the first lookup.
169 Return the cached value.
170 """
171 if instance is None:
172 return self
173 data = instance.__dict__
174 field_name = self.field.attname
175 if field_name not in data:
176 instance.refresh_from_db(fields=[field_name])
177 return data[field_name]
178
179
180class class_or_instance_method:
181 """
182 Hook used in RegisterLookupMixin to return partial functions depending on
183 the caller type (instance or class of models.Field).
184 """
185
186 def __init__(self, class_method, instance_method):
187 self.class_method = class_method
188 self.instance_method = instance_method
189
190 def __get__(self, instance, owner):
191 if instance is None:
192 return functools.partial(self.class_method, owner)
193 return functools.partial(self.instance_method, instance)
194
195
196class RegisterLookupMixin:
197 def _get_lookup(self, lookup_name):
198 return self.get_lookups().get(lookup_name, None)
199
200 @functools.cache
201 def get_class_lookups(cls):
202 class_lookups = [
203 parent.__dict__.get("class_lookups", {}) for parent in inspect.getmro(cls)
204 ]
205 return cls.merge_dicts(class_lookups)
206
207 def get_instance_lookups(self):
208 class_lookups = self.get_class_lookups()
209 if instance_lookups := getattr(self, "instance_lookups", None):
210 return {**class_lookups, **instance_lookups}
211 return class_lookups
212
213 get_lookups = class_or_instance_method(get_class_lookups, get_instance_lookups)
214 get_class_lookups = classmethod(get_class_lookups)
215
216 def get_lookup(self, lookup_name):
217 from plain.models.lookups import Lookup
218
219 found = self._get_lookup(lookup_name)
220 if found is None and hasattr(self, "output_field"):
221 return self.output_field.get_lookup(lookup_name)
222 if found is not None and not issubclass(found, Lookup):
223 return None
224 return found
225
226 def get_transform(self, lookup_name):
227 from plain.models.lookups import Transform
228
229 found = self._get_lookup(lookup_name)
230 if found is None and hasattr(self, "output_field"):
231 return self.output_field.get_transform(lookup_name)
232 if found is not None and not issubclass(found, Transform):
233 return None
234 return found
235
236 @staticmethod
237 def merge_dicts(dicts):
238 """
239 Merge dicts in reverse to preference the order of the original list. e.g.,
240 merge_dicts([a, b]) will preference the keys in 'a' over those in 'b'.
241 """
242 merged = {}
243 for d in reversed(dicts):
244 merged.update(d)
245 return merged
246
247 @classmethod
248 def _clear_cached_class_lookups(cls):
249 for subclass in subclasses(cls):
250 subclass.get_class_lookups.cache_clear()
251
252 def register_class_lookup(cls, lookup, lookup_name=None):
253 if lookup_name is None:
254 lookup_name = lookup.lookup_name
255 if "class_lookups" not in cls.__dict__:
256 cls.class_lookups = {}
257 cls.class_lookups[lookup_name] = lookup
258 cls._clear_cached_class_lookups()
259 return lookup
260
261 def register_instance_lookup(self, lookup, lookup_name=None):
262 if lookup_name is None:
263 lookup_name = lookup.lookup_name
264 if "instance_lookups" not in self.__dict__:
265 self.instance_lookups = {}
266 self.instance_lookups[lookup_name] = lookup
267 return lookup
268
269 register_lookup = class_or_instance_method(
270 register_class_lookup, register_instance_lookup
271 )
272 register_class_lookup = classmethod(register_class_lookup)
273
274 def _unregister_class_lookup(cls, lookup, lookup_name=None):
275 """
276 Remove given lookup from cls lookups. For use in tests only as it's
277 not thread-safe.
278 """
279 if lookup_name is None:
280 lookup_name = lookup.lookup_name
281 del cls.class_lookups[lookup_name]
282 cls._clear_cached_class_lookups()
283
284 def _unregister_instance_lookup(self, lookup, lookup_name=None):
285 """
286 Remove given lookup from instance lookups. For use in tests only as
287 it's not thread-safe.
288 """
289 if lookup_name is None:
290 lookup_name = lookup.lookup_name
291 del self.instance_lookups[lookup_name]
292
293 _unregister_lookup = class_or_instance_method(
294 _unregister_class_lookup, _unregister_instance_lookup
295 )
296 _unregister_class_lookup = classmethod(_unregister_class_lookup)
297
298
299def select_related_descend(field, restricted, requested, select_mask, reverse=False):
300 """
301 Return True if this field should be used to descend deeper for
302 select_related() purposes. Used by both the query construction code
303 (compiler.get_related_selections()) and the model instance creation code
304 (compiler.klass_info).
305
306 Arguments:
307 * field - the field to be checked
308 * restricted - a boolean field, indicating if the field list has been
309 manually restricted using a requested clause)
310 * requested - The select_related() dictionary.
311 * select_mask - the dictionary of selected fields.
312 * reverse - boolean, True if we are checking a reverse select related
313 """
314 if not field.remote_field:
315 return False
316 if field.remote_field.parent_link and not reverse:
317 return False
318 if restricted:
319 if reverse and field.related_query_name() not in requested:
320 return False
321 if not reverse and field.name not in requested:
322 return False
323 if not restricted and field.allow_null:
324 return False
325 if (
326 restricted
327 and select_mask
328 and field.name in requested
329 and field not in select_mask
330 ):
331 raise FieldError(
332 f"Field {field.model._meta.object_name}.{field.name} cannot be both "
333 "deferred and traversed using select_related at the same time."
334 )
335 return True
336
337
338def refs_expression(lookup_parts, annotations):
339 """
340 Check if the lookup_parts contains references to the given annotations set.
341 Because the LOOKUP_SEP is contained in the default annotation names, check
342 each prefix of the lookup_parts for a match.
343 """
344 for n in range(1, len(lookup_parts) + 1):
345 level_n_lookup = LOOKUP_SEP.join(lookup_parts[0:n])
346 if annotations.get(level_n_lookup):
347 return level_n_lookup, lookup_parts[n:]
348 return None, ()
349
350
351def check_rel_lookup_compatibility(model, target_opts, field):
352 """
353 Check that self.model is compatible with target_opts. Compatibility
354 is OK if:
355 1) model and opts match (where proxy inheritance is removed)
356 2) model is parent of opts' model or the other way around
357 """
358
359 def check(opts):
360 return model._meta.concrete_model == opts.concrete_model
361
362 # If the field is a primary key, then doing a query against the field's
363 # model is ok, too. Consider the case:
364 # class Restaurant(models.Model):
365 # place = OneToOneField(Place, primary_key=True):
366 # Restaurant.objects.filter(pk__in=Restaurant.objects.all()).
367 # If we didn't have the primary key check, then pk__in (== place__in) would
368 # give Place's opts as the target opts, but Restaurant isn't compatible
369 # with that. This logic applies only to primary keys, as when doing __in=qs,
370 # we are going to turn this into __in=qs.values('pk') later on.
371 return check(target_opts) or (
372 getattr(field, "primary_key", False) and check(field.model._meta)
373 )
374
375
376class FilteredRelation:
377 """Specify custom filtering in the ON clause of SQL joins."""
378
379 def __init__(self, relation_name, *, condition=Q()):
380 if not relation_name:
381 raise ValueError("relation_name cannot be empty.")
382 self.relation_name = relation_name
383 self.alias = None
384 if not isinstance(condition, Q):
385 raise ValueError("condition argument must be a Q() instance.")
386 self.condition = condition
387 self.path = []
388
389 def __eq__(self, other):
390 if not isinstance(other, self.__class__):
391 return NotImplemented
392 return (
393 self.relation_name == other.relation_name
394 and self.alias == other.alias
395 and self.condition == other.condition
396 )
397
398 def clone(self):
399 clone = FilteredRelation(self.relation_name, condition=self.condition)
400 clone.alias = self.alias
401 clone.path = self.path[:]
402 return clone
403
404 def resolve_expression(self, *args, **kwargs):
405 """
406 QuerySet.annotate() only accepts expression-like arguments
407 (with a resolve_expression() method).
408 """
409 raise NotImplementedError("FilteredRelation.resolve_expression() is unused.")
410
411 def as_sql(self, compiler, connection):
412 # Resolve the condition in Join.filtered_relation.
413 query = compiler.query
414 where = query.build_filtered_relation_q(self.condition, reuse=set(self.path))
415 return compiler.compile(where)