plain.models
Model your data and store it in a database.
# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField
class User(models.Model):
email = models.EmailField()
password = PasswordField()
is_admin = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.email
Create, update, and delete instances of your models:
from .models import User
# Create a new user
user = User.objects.create(
email="[email protected]",
password="password",
)
# Update a user
user.email = "[email protected]"
user.save()
# Delete a user
user.delete()
# Query for users
admin_users = User.objects.filter(is_admin=True)
Installation
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.models",
]
To connect to a database, you can provide a DATABASE_URL
environment variable.
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Or you can manually define the DATABASES
setting.
# app/settings.py
DATABASES = {
"default": {
"ENGINE": "plain.models.backends.postgresql",
"NAME": "dbname",
"USER": "user",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5432",
}
}
Multiple backends are supported, including Postgres, MySQL, and SQLite.
Querying
Migrations
Fields
Validation
Indexes and constraints
Managers
Forms
1from enum import Enum
2from types import NoneType
3
4from plain.exceptions import FieldError, ValidationError
5from plain.models.db import DEFAULT_DB_ALIAS, connections
6from plain.models.expressions import Exists, ExpressionList, F, OrderBy
7from plain.models.indexes import IndexExpression
8from plain.models.lookups import Exact
9from plain.models.query_utils import Q
10from plain.models.sql.query import Query
11
12__all__ = ["BaseConstraint", "CheckConstraint", "Deferrable", "UniqueConstraint"]
13
14
15class BaseConstraint:
16 default_violation_error_message = "Constraint โ%(name)sโ is violated."
17 violation_error_code = None
18 violation_error_message = None
19
20 def __init__(
21 self, *, name, violation_error_code=None, violation_error_message=None
22 ):
23 self.name = name
24 if violation_error_code is not None:
25 self.violation_error_code = violation_error_code
26 if violation_error_message is not None:
27 self.violation_error_message = violation_error_message
28 else:
29 self.violation_error_message = self.default_violation_error_message
30
31 @property
32 def contains_expressions(self):
33 return False
34
35 def constraint_sql(self, model, schema_editor):
36 raise NotImplementedError("This method must be implemented by a subclass.")
37
38 def create_sql(self, model, schema_editor):
39 raise NotImplementedError("This method must be implemented by a subclass.")
40
41 def remove_sql(self, model, schema_editor):
42 raise NotImplementedError("This method must be implemented by a subclass.")
43
44 def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
45 raise NotImplementedError("This method must be implemented by a subclass.")
46
47 def get_violation_error_message(self):
48 return self.violation_error_message % {"name": self.name}
49
50 def deconstruct(self):
51 path = f"{self.__class__.__module__}.{self.__class__.__name__}"
52 path = path.replace("plain.models.constraints", "plain.models")
53 kwargs = {"name": self.name}
54 if (
55 self.violation_error_message is not None
56 and self.violation_error_message != self.default_violation_error_message
57 ):
58 kwargs["violation_error_message"] = self.violation_error_message
59 if self.violation_error_code is not None:
60 kwargs["violation_error_code"] = self.violation_error_code
61 return (path, (), kwargs)
62
63 def clone(self):
64 _, args, kwargs = self.deconstruct()
65 return self.__class__(*args, **kwargs)
66
67
68class CheckConstraint(BaseConstraint):
69 def __init__(
70 self, *, check, name, violation_error_code=None, violation_error_message=None
71 ):
72 self.check = check
73 if not getattr(check, "conditional", False):
74 raise TypeError(
75 "CheckConstraint.check must be a Q instance or boolean expression."
76 )
77 super().__init__(
78 name=name,
79 violation_error_code=violation_error_code,
80 violation_error_message=violation_error_message,
81 )
82
83 def _get_check_sql(self, model, schema_editor):
84 query = Query(model=model, alias_cols=False)
85 where = query.build_where(self.check)
86 compiler = query.get_compiler(connection=schema_editor.connection)
87 sql, params = where.as_sql(compiler, schema_editor.connection)
88 return sql % tuple(schema_editor.quote_value(p) for p in params)
89
90 def constraint_sql(self, model, schema_editor):
91 check = self._get_check_sql(model, schema_editor)
92 return schema_editor._check_sql(self.name, check)
93
94 def create_sql(self, model, schema_editor):
95 check = self._get_check_sql(model, schema_editor)
96 return schema_editor._create_check_sql(model, self.name, check)
97
98 def remove_sql(self, model, schema_editor):
99 return schema_editor._delete_check_sql(model, self.name)
100
101 def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
102 against = instance._get_field_value_map(meta=model._meta, exclude=exclude)
103 try:
104 if not Q(self.check).check(against, using=using):
105 raise ValidationError(
106 self.get_violation_error_message(), code=self.violation_error_code
107 )
108 except FieldError:
109 pass
110
111 def __repr__(self):
112 return "<{}: check={} name={}{}{}>".format(
113 self.__class__.__qualname__,
114 self.check,
115 repr(self.name),
116 (
117 ""
118 if self.violation_error_code is None
119 else f" violation_error_code={self.violation_error_code!r}"
120 ),
121 (
122 ""
123 if self.violation_error_message is None
124 or self.violation_error_message == self.default_violation_error_message
125 else f" violation_error_message={self.violation_error_message!r}"
126 ),
127 )
128
129 def __eq__(self, other):
130 if isinstance(other, CheckConstraint):
131 return (
132 self.name == other.name
133 and self.check == other.check
134 and self.violation_error_code == other.violation_error_code
135 and self.violation_error_message == other.violation_error_message
136 )
137 return super().__eq__(other)
138
139 def deconstruct(self):
140 path, args, kwargs = super().deconstruct()
141 kwargs["check"] = self.check
142 return path, args, kwargs
143
144
145class Deferrable(Enum):
146 DEFERRED = "deferred"
147 IMMEDIATE = "immediate"
148
149 # A similar format was proposed for Python 3.10.
150 def __repr__(self):
151 return f"{self.__class__.__qualname__}.{self._name_}"
152
153
154class UniqueConstraint(BaseConstraint):
155 def __init__(
156 self,
157 *expressions,
158 fields=(),
159 name=None,
160 condition=None,
161 deferrable=None,
162 include=None,
163 opclasses=(),
164 violation_error_code=None,
165 violation_error_message=None,
166 ):
167 if not name:
168 raise ValueError("A unique constraint must be named.")
169 if not expressions and not fields:
170 raise ValueError(
171 "At least one field or expression is required to define a "
172 "unique constraint."
173 )
174 if expressions and fields:
175 raise ValueError(
176 "UniqueConstraint.fields and expressions are mutually exclusive."
177 )
178 if not isinstance(condition, NoneType | Q):
179 raise ValueError("UniqueConstraint.condition must be a Q instance.")
180 if condition and deferrable:
181 raise ValueError("UniqueConstraint with conditions cannot be deferred.")
182 if include and deferrable:
183 raise ValueError("UniqueConstraint with include fields cannot be deferred.")
184 if opclasses and deferrable:
185 raise ValueError("UniqueConstraint with opclasses cannot be deferred.")
186 if expressions and deferrable:
187 raise ValueError("UniqueConstraint with expressions cannot be deferred.")
188 if expressions and opclasses:
189 raise ValueError(
190 "UniqueConstraint.opclasses cannot be used with expressions. "
191 "Use a custom OpClass() instead."
192 )
193 if not isinstance(deferrable, NoneType | Deferrable):
194 raise ValueError(
195 "UniqueConstraint.deferrable must be a Deferrable instance."
196 )
197 if not isinstance(include, NoneType | list | tuple):
198 raise ValueError("UniqueConstraint.include must be a list or tuple.")
199 if not isinstance(opclasses, list | tuple):
200 raise ValueError("UniqueConstraint.opclasses must be a list or tuple.")
201 if opclasses and len(fields) != len(opclasses):
202 raise ValueError(
203 "UniqueConstraint.fields and UniqueConstraint.opclasses must "
204 "have the same number of elements."
205 )
206 self.fields = tuple(fields)
207 self.condition = condition
208 self.deferrable = deferrable
209 self.include = tuple(include) if include else ()
210 self.opclasses = opclasses
211 self.expressions = tuple(
212 F(expression) if isinstance(expression, str) else expression
213 for expression in expressions
214 )
215 super().__init__(
216 name=name,
217 violation_error_code=violation_error_code,
218 violation_error_message=violation_error_message,
219 )
220
221 @property
222 def contains_expressions(self):
223 return bool(self.expressions)
224
225 def _get_condition_sql(self, model, schema_editor):
226 if self.condition is None:
227 return None
228 query = Query(model=model, alias_cols=False)
229 where = query.build_where(self.condition)
230 compiler = query.get_compiler(connection=schema_editor.connection)
231 sql, params = where.as_sql(compiler, schema_editor.connection)
232 return sql % tuple(schema_editor.quote_value(p) for p in params)
233
234 def _get_index_expressions(self, model, schema_editor):
235 if not self.expressions:
236 return None
237 index_expressions = []
238 for expression in self.expressions:
239 index_expression = IndexExpression(expression)
240 index_expression.set_wrapper_classes(schema_editor.connection)
241 index_expressions.append(index_expression)
242 return ExpressionList(*index_expressions).resolve_expression(
243 Query(model, alias_cols=False),
244 )
245
246 def constraint_sql(self, model, schema_editor):
247 fields = [model._meta.get_field(field_name) for field_name in self.fields]
248 include = [
249 model._meta.get_field(field_name).column for field_name in self.include
250 ]
251 condition = self._get_condition_sql(model, schema_editor)
252 expressions = self._get_index_expressions(model, schema_editor)
253 return schema_editor._unique_sql(
254 model,
255 fields,
256 self.name,
257 condition=condition,
258 deferrable=self.deferrable,
259 include=include,
260 opclasses=self.opclasses,
261 expressions=expressions,
262 )
263
264 def create_sql(self, model, schema_editor):
265 fields = [model._meta.get_field(field_name) for field_name in self.fields]
266 include = [
267 model._meta.get_field(field_name).column for field_name in self.include
268 ]
269 condition = self._get_condition_sql(model, schema_editor)
270 expressions = self._get_index_expressions(model, schema_editor)
271 return schema_editor._create_unique_sql(
272 model,
273 fields,
274 self.name,
275 condition=condition,
276 deferrable=self.deferrable,
277 include=include,
278 opclasses=self.opclasses,
279 expressions=expressions,
280 )
281
282 def remove_sql(self, model, schema_editor):
283 condition = self._get_condition_sql(model, schema_editor)
284 include = [
285 model._meta.get_field(field_name).column for field_name in self.include
286 ]
287 expressions = self._get_index_expressions(model, schema_editor)
288 return schema_editor._delete_unique_sql(
289 model,
290 self.name,
291 condition=condition,
292 deferrable=self.deferrable,
293 include=include,
294 opclasses=self.opclasses,
295 expressions=expressions,
296 )
297
298 def __repr__(self):
299 return "<{}:{}{}{}{}{}{}{}{}{}>".format(
300 self.__class__.__qualname__,
301 "" if not self.fields else f" fields={repr(self.fields)}",
302 "" if not self.expressions else f" expressions={repr(self.expressions)}",
303 f" name={repr(self.name)}",
304 "" if self.condition is None else f" condition={self.condition}",
305 "" if self.deferrable is None else f" deferrable={self.deferrable!r}",
306 "" if not self.include else f" include={repr(self.include)}",
307 "" if not self.opclasses else f" opclasses={repr(self.opclasses)}",
308 (
309 ""
310 if self.violation_error_code is None
311 else f" violation_error_code={self.violation_error_code!r}"
312 ),
313 (
314 ""
315 if self.violation_error_message is None
316 or self.violation_error_message == self.default_violation_error_message
317 else f" violation_error_message={self.violation_error_message!r}"
318 ),
319 )
320
321 def __eq__(self, other):
322 if isinstance(other, UniqueConstraint):
323 return (
324 self.name == other.name
325 and self.fields == other.fields
326 and self.condition == other.condition
327 and self.deferrable == other.deferrable
328 and self.include == other.include
329 and self.opclasses == other.opclasses
330 and self.expressions == other.expressions
331 and self.violation_error_code == other.violation_error_code
332 and self.violation_error_message == other.violation_error_message
333 )
334 return super().__eq__(other)
335
336 def deconstruct(self):
337 path, args, kwargs = super().deconstruct()
338 if self.fields:
339 kwargs["fields"] = self.fields
340 if self.condition:
341 kwargs["condition"] = self.condition
342 if self.deferrable:
343 kwargs["deferrable"] = self.deferrable
344 if self.include:
345 kwargs["include"] = self.include
346 if self.opclasses:
347 kwargs["opclasses"] = self.opclasses
348 return path, self.expressions, kwargs
349
350 def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
351 queryset = model._default_manager.using(using)
352 if self.fields:
353 lookup_kwargs = {}
354 for field_name in self.fields:
355 if exclude and field_name in exclude:
356 return
357 field = model._meta.get_field(field_name)
358 lookup_value = getattr(instance, field.attname)
359 if lookup_value is None or (
360 lookup_value == ""
361 and connections[using].features.interprets_empty_strings_as_nulls
362 ):
363 # A composite constraint containing NULL value cannot cause
364 # a violation since NULL != NULL in SQL.
365 return
366 lookup_kwargs[field.name] = lookup_value
367 queryset = queryset.filter(**lookup_kwargs)
368 else:
369 # Ignore constraints with excluded fields.
370 if exclude:
371 for expression in self.expressions:
372 if hasattr(expression, "flatten"):
373 for expr in expression.flatten():
374 if isinstance(expr, F) and expr.name in exclude:
375 return
376 elif isinstance(expression, F) and expression.name in exclude:
377 return
378 replacements = {
379 F(field): value
380 for field, value in instance._get_field_value_map(
381 meta=model._meta, exclude=exclude
382 ).items()
383 }
384 expressions = []
385 for expr in self.expressions:
386 # Ignore ordering.
387 if isinstance(expr, OrderBy):
388 expr = expr.expression
389 expressions.append(Exact(expr, expr.replace_expressions(replacements)))
390 queryset = queryset.filter(*expressions)
391 model_class_pk = instance._get_pk_val(model._meta)
392 if not instance._state.adding and model_class_pk is not None:
393 queryset = queryset.exclude(pk=model_class_pk)
394 if not self.condition:
395 if queryset.exists():
396 if self.expressions:
397 raise ValidationError(
398 self.get_violation_error_message(),
399 code=self.violation_error_code,
400 )
401 # When fields are defined, use the unique_error_message() for
402 # backward compatibility.
403 for model, constraints in instance.get_constraints():
404 for constraint in constraints:
405 if constraint is self:
406 raise ValidationError(
407 instance.unique_error_message(model, self.fields),
408 )
409 else:
410 against = instance._get_field_value_map(meta=model._meta, exclude=exclude)
411 try:
412 if (self.condition & Exists(queryset.filter(self.condition))).check(
413 against, using=using
414 ):
415 raise ValidationError(
416 self.get_violation_error_message(),
417 code=self.violation_error_code,
418 )
419 except FieldError:
420 pass