plain.models
Model your data and store it in a database.
# app/users/models.py
from plain import models
from plain.passwords.models import PasswordField
class User(models.Model):
email = models.EmailField(unique=True)
password = PasswordField()
is_staff = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.email
Create, update, and delete instances of your models:
from .models import User
# Create a new user
user = User.objects.create(
email="[email protected]",
password="password",
)
# Update a user
user.email = "[email protected]"
user.save()
# Delete a user
user.delete()
# Query for users
staff_users = User.objects.filter(is_staff=True)
Installation
# app/settings.py
INSTALLED_PACKAGES = [
...
"plain.models",
]
To connect to a database, you can provide a DATABASE_URL
environment variable.
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
Or you can manually define the DATABASES
setting.
# app/settings.py
DATABASES = {
"default": {
"ENGINE": "plain.models.backends.postgresql",
"NAME": "dbname",
"USER": "user",
"PASSWORD": "password",
"HOST": "localhost",
"PORT": "5432",
}
}
Multiple backends are supported, including Postgres, MySQL, and SQLite.
Querying
Migrations
Fields
Validation
Indexes and constraints
Managers
Forms
1"""
2Classes to represent the definitions of aggregate functions.
3"""
4from plain.exceptions import FieldError, FullResultSet
5from plain.models.expressions import Case, Func, Star, Value, When
6from plain.models.fields import IntegerField
7from plain.models.functions.comparison import Coalesce
8from plain.models.functions.mixins import (
9 FixDurationInputMixin,
10 NumericOutputFieldMixin,
11)
12
13__all__ = [
14 "Aggregate",
15 "Avg",
16 "Count",
17 "Max",
18 "Min",
19 "StdDev",
20 "Sum",
21 "Variance",
22]
23
24
25class Aggregate(Func):
26 template = "%(function)s(%(distinct)s%(expressions)s)"
27 contains_aggregate = True
28 name = None
29 filter_template = "%s FILTER (WHERE %%(filter)s)"
30 window_compatible = True
31 allow_distinct = False
32 empty_result_set_value = None
33
34 def __init__(
35 self, *expressions, distinct=False, filter=None, default=None, **extra
36 ):
37 if distinct and not self.allow_distinct:
38 raise TypeError("%s does not allow distinct." % self.__class__.__name__)
39 if default is not None and self.empty_result_set_value is not None:
40 raise TypeError(f"{self.__class__.__name__} does not allow default.")
41 self.distinct = distinct
42 self.filter = filter
43 self.default = default
44 super().__init__(*expressions, **extra)
45
46 def get_source_fields(self):
47 # Don't return the filter expression since it's not a source field.
48 return [e._output_field_or_none for e in super().get_source_expressions()]
49
50 def get_source_expressions(self):
51 source_expressions = super().get_source_expressions()
52 if self.filter:
53 return source_expressions + [self.filter]
54 return source_expressions
55
56 def set_source_expressions(self, exprs):
57 self.filter = self.filter and exprs.pop()
58 return super().set_source_expressions(exprs)
59
60 def resolve_expression(
61 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
62 ):
63 # Aggregates are not allowed in UPDATE queries, so ignore for_save
64 c = super().resolve_expression(query, allow_joins, reuse, summarize)
65 c.filter = c.filter and c.filter.resolve_expression(
66 query, allow_joins, reuse, summarize
67 )
68 if not summarize:
69 # Call Aggregate.get_source_expressions() to avoid
70 # returning self.filter and including that in this loop.
71 expressions = super(Aggregate, c).get_source_expressions()
72 for index, expr in enumerate(expressions):
73 if expr.contains_aggregate:
74 before_resolved = self.get_source_expressions()[index]
75 name = (
76 before_resolved.name
77 if hasattr(before_resolved, "name")
78 else repr(before_resolved)
79 )
80 raise FieldError(
81 f"Cannot compute {c.name}('{name}'): '{name}' is an aggregate"
82 )
83 if (default := c.default) is None:
84 return c
85 if hasattr(default, "resolve_expression"):
86 default = default.resolve_expression(query, allow_joins, reuse, summarize)
87 if default._output_field_or_none is None:
88 default.output_field = c._output_field_or_none
89 else:
90 default = Value(default, c._output_field_or_none)
91 c.default = None # Reset the default argument before wrapping.
92 coalesce = Coalesce(c, default, output_field=c._output_field_or_none)
93 coalesce.is_summary = c.is_summary
94 return coalesce
95
96 @property
97 def default_alias(self):
98 expressions = self.get_source_expressions()
99 if len(expressions) == 1 and hasattr(expressions[0], "name"):
100 return f"{expressions[0].name}__{self.name.lower()}"
101 raise TypeError("Complex expressions require an alias")
102
103 def get_group_by_cols(self):
104 return []
105
106 def as_sql(self, compiler, connection, **extra_context):
107 extra_context["distinct"] = "DISTINCT " if self.distinct else ""
108 if self.filter:
109 if connection.features.supports_aggregate_filter_clause:
110 try:
111 filter_sql, filter_params = self.filter.as_sql(compiler, connection)
112 except FullResultSet:
113 pass
114 else:
115 template = self.filter_template % extra_context.get(
116 "template", self.template
117 )
118 sql, params = super().as_sql(
119 compiler,
120 connection,
121 template=template,
122 filter=filter_sql,
123 **extra_context,
124 )
125 return sql, (*params, *filter_params)
126 else:
127 copy = self.copy()
128 copy.filter = None
129 source_expressions = copy.get_source_expressions()
130 condition = When(self.filter, then=source_expressions[0])
131 copy.set_source_expressions([Case(condition)] + source_expressions[1:])
132 return super(Aggregate, copy).as_sql(
133 compiler, connection, **extra_context
134 )
135 return super().as_sql(compiler, connection, **extra_context)
136
137 def _get_repr_options(self):
138 options = super()._get_repr_options()
139 if self.distinct:
140 options["distinct"] = self.distinct
141 if self.filter:
142 options["filter"] = self.filter
143 return options
144
145
146class Avg(FixDurationInputMixin, NumericOutputFieldMixin, Aggregate):
147 function = "AVG"
148 name = "Avg"
149 allow_distinct = True
150
151
152class Count(Aggregate):
153 function = "COUNT"
154 name = "Count"
155 output_field = IntegerField()
156 allow_distinct = True
157 empty_result_set_value = 0
158
159 def __init__(self, expression, filter=None, **extra):
160 if expression == "*":
161 expression = Star()
162 if isinstance(expression, Star) and filter is not None:
163 raise ValueError("Star cannot be used with filter. Please specify a field.")
164 super().__init__(expression, filter=filter, **extra)
165
166
167class Max(Aggregate):
168 function = "MAX"
169 name = "Max"
170
171
172class Min(Aggregate):
173 function = "MIN"
174 name = "Min"
175
176
177class StdDev(NumericOutputFieldMixin, Aggregate):
178 name = "StdDev"
179
180 def __init__(self, expression, sample=False, **extra):
181 self.function = "STDDEV_SAMP" if sample else "STDDEV_POP"
182 super().__init__(expression, **extra)
183
184 def _get_repr_options(self):
185 return {**super()._get_repr_options(), "sample": self.function == "STDDEV_SAMP"}
186
187
188class Sum(FixDurationInputMixin, Aggregate):
189 function = "SUM"
190 name = "Sum"
191 allow_distinct = True
192
193
194class Variance(NumericOutputFieldMixin, Aggregate):
195 name = "Variance"
196
197 def __init__(self, expression, sample=False, **extra):
198 self.function = "VAR_SAMP" if sample else "VAR_POP"
199 super().__init__(expression, **extra)
200
201 def _get_repr_options(self):
202 return {**super()._get_repr_options(), "sample": self.function == "VAR_SAMP"}